HBASE-6802 Export Snapshot (Matteo Bertozzi)
git-svn-id: https://svn.apache.org/repos/asf/hbase/branches/hbase-7290@1445787 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
parent
0bcb524581
commit
378e6f62b5
|
@ -243,6 +243,17 @@ public class SnapshotDescriptionUtils {
|
||||||
snapshot.getName());
|
snapshot.getName());
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Get the directory to build a snapshot, before it is finalized
|
||||||
|
* @param snapshotName name of the snapshot
|
||||||
|
* @param rootDir root directory of the hbase installation
|
||||||
|
* @return {@link Path} where one can build a snapshot
|
||||||
|
*/
|
||||||
|
public static Path getWorkingSnapshotDir(String snapshotName, final Path rootDir) {
|
||||||
|
return getCompletedSnapshotDir(new Path(getSnapshotsDir(rootDir), SNAPSHOT_TMP_DIR_NAME),
|
||||||
|
snapshotName);
|
||||||
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Get the directory to store the snapshot instance
|
* Get the directory to store the snapshot instance
|
||||||
* @param snapshotsDir hbase-global directory for storing all snapshots
|
* @param snapshotsDir hbase-global directory for storing all snapshots
|
||||||
|
|
|
@ -0,0 +1,43 @@
|
||||||
|
/**
|
||||||
|
* Licensed to the Apache Software Foundation (ASF) under one
|
||||||
|
* or more contributor license agreements. See the NOTICE file
|
||||||
|
* distributed with this work for additional information
|
||||||
|
* regarding copyright ownership. The ASF licenses this file
|
||||||
|
* to you under the Apache License, Version 2.0 (the
|
||||||
|
* "License"); you may not use this file except in compliance
|
||||||
|
* with the License. You may obtain a copy of the License at
|
||||||
|
*
|
||||||
|
* http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
*
|
||||||
|
* Unless required by applicable law or agreed to in writing, software
|
||||||
|
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||||
|
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||||
|
* See the License for the specific language governing permissions and
|
||||||
|
* limitations under the License.
|
||||||
|
*/
|
||||||
|
package org.apache.hadoop.hbase.snapshot.exception;
|
||||||
|
|
||||||
|
import org.apache.hadoop.classification.InterfaceAudience;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Thrown when a snapshot could not be exported due to an error during the operation.
|
||||||
|
*/
|
||||||
|
@InterfaceAudience.Public
|
||||||
|
@SuppressWarnings("serial")
|
||||||
|
public class ExportSnapshotException extends HBaseSnapshotException {
|
||||||
|
|
||||||
|
/**
|
||||||
|
* @param msg message describing the exception
|
||||||
|
*/
|
||||||
|
public ExportSnapshotException(String msg) {
|
||||||
|
super(msg);
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* @param message message describing the exception
|
||||||
|
* @param e cause
|
||||||
|
*/
|
||||||
|
public ExportSnapshotException(String message, Exception e) {
|
||||||
|
super(message, e);
|
||||||
|
}
|
||||||
|
}
|
|
@ -0,0 +1,714 @@
|
||||||
|
/**
|
||||||
|
* Licensed to the Apache Software Foundation (ASF) under one
|
||||||
|
* or more contributor license agreements. See the NOTICE file
|
||||||
|
* distributed with this work for additional information
|
||||||
|
* regarding copyright ownership. The ASF licenses this file
|
||||||
|
* to you under the Apache License, Version 2.0 (the
|
||||||
|
* "License"); you may not use this file except in compliance
|
||||||
|
* with the License. You may obtain a copy of the License at
|
||||||
|
*
|
||||||
|
* http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
*
|
||||||
|
* Unless required by applicable law or agreed to in writing, software
|
||||||
|
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||||
|
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||||
|
* See the License for the specific language governing permissions and
|
||||||
|
* limitations under the License.
|
||||||
|
*/
|
||||||
|
|
||||||
|
package org.apache.hadoop.hbase.snapshot.tool;
|
||||||
|
|
||||||
|
import java.io.FileNotFoundException;
|
||||||
|
import java.io.IOException;
|
||||||
|
import java.util.ArrayList;
|
||||||
|
import java.util.Collections;
|
||||||
|
import java.util.Comparator;
|
||||||
|
import java.util.LinkedList;
|
||||||
|
import java.util.List;
|
||||||
|
|
||||||
|
import org.apache.commons.logging.Log;
|
||||||
|
import org.apache.commons.logging.LogFactory;
|
||||||
|
|
||||||
|
import org.apache.hadoop.fs.Path;
|
||||||
|
import org.apache.hadoop.fs.FileSystem;
|
||||||
|
import org.apache.hadoop.fs.FileUtil;
|
||||||
|
import org.apache.hadoop.fs.permission.FsPermission;
|
||||||
|
import org.apache.hadoop.classification.InterfaceAudience;
|
||||||
|
import org.apache.hadoop.classification.InterfaceStability;
|
||||||
|
import org.apache.hadoop.conf.Configuration;
|
||||||
|
import org.apache.hadoop.conf.Configured;
|
||||||
|
import org.apache.hadoop.fs.FSDataInputStream;
|
||||||
|
import org.apache.hadoop.fs.FSDataOutputStream;
|
||||||
|
import org.apache.hadoop.fs.FileStatus;
|
||||||
|
import org.apache.hadoop.fs.FileChecksum;
|
||||||
|
import org.apache.hadoop.io.NullWritable;
|
||||||
|
import org.apache.hadoop.io.SequenceFile;
|
||||||
|
import org.apache.hadoop.io.Text;
|
||||||
|
import org.apache.hadoop.mapreduce.Job;
|
||||||
|
import org.apache.hadoop.mapreduce.Mapper;
|
||||||
|
import org.apache.hadoop.mapreduce.lib.input.SequenceFileInputFormat;
|
||||||
|
import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
|
||||||
|
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
|
||||||
|
import org.apache.hadoop.mapreduce.lib.output.NullOutputFormat;
|
||||||
|
import org.apache.hadoop.util.StringUtils;
|
||||||
|
import org.apache.hadoop.util.Tool;
|
||||||
|
import org.apache.hadoop.util.ToolRunner;
|
||||||
|
import org.apache.hadoop.hbase.HBaseConfiguration;
|
||||||
|
import org.apache.hadoop.hbase.HConstants;
|
||||||
|
import org.apache.hadoop.hbase.HTableDescriptor;
|
||||||
|
import org.apache.hadoop.hbase.io.HFileLink;
|
||||||
|
import org.apache.hadoop.hbase.io.HLogLink;
|
||||||
|
import org.apache.hadoop.hbase.io.Reference;
|
||||||
|
import org.apache.hadoop.hbase.mapreduce.JobUtil;
|
||||||
|
import org.apache.hadoop.hbase.protobuf.generated.HBaseProtos.SnapshotDescription;
|
||||||
|
import org.apache.hadoop.hbase.snapshot.SnapshotDescriptionUtils;
|
||||||
|
import org.apache.hadoop.hbase.snapshot.SnapshotReferenceUtil;
|
||||||
|
import org.apache.hadoop.hbase.snapshot.exception.ExportSnapshotException;
|
||||||
|
import org.apache.hadoop.hbase.util.Bytes;
|
||||||
|
import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
|
||||||
|
import org.apache.hadoop.hbase.util.FSUtils;
|
||||||
|
import org.apache.hadoop.hbase.util.Pair;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Export the specified snapshot to a given FileSystem.
|
||||||
|
*
|
||||||
|
* The .snapshot/name folder is copied to the destination cluster
|
||||||
|
* and then all the hfiles/hlogs are copied using a Map-Reduce Job in the .archive/ location.
|
||||||
|
* When everything is done, the second cluster can restore the snapshot.
|
||||||
|
*/
|
||||||
|
@InterfaceAudience.Public
|
||||||
|
@InterfaceStability.Evolving
|
||||||
|
public final class ExportSnapshot extends Configured implements Tool {
|
||||||
|
private static final Log LOG = LogFactory.getLog(ExportSnapshot.class);
|
||||||
|
|
||||||
|
private static final String CONF_FILES_USER = "snapshot.export.files.attributes.user";
|
||||||
|
private static final String CONF_FILES_GROUP = "snapshot.export.files.attributes.group";
|
||||||
|
private static final String CONF_FILES_MODE = "snapshot.export.files.attributes.mode";
|
||||||
|
private static final String CONF_CHECKSUM_VERIFY = "snapshot.export.checksum.verify";
|
||||||
|
private static final String CONF_OUTPUT_ROOT = "snapshot.export.output.root";
|
||||||
|
private static final String CONF_INPUT_ROOT = "snapshot.export.input.root";
|
||||||
|
|
||||||
|
private static final String INPUT_FOLDER_PREFIX = "export-files.";
|
||||||
|
|
||||||
|
// Export Map-Reduce Counters, to keep track of the progress
|
||||||
|
public enum Counter { MISSING_FILES, COPY_FAILED, BYTES_EXPECTED, BYTES_COPIED };
|
||||||
|
|
||||||
|
private static class ExportMapper extends Mapper<Text, NullWritable, NullWritable, NullWritable> {
|
||||||
|
final static int REPORT_SIZE = 1 * 1024 * 1024;
|
||||||
|
final static int BUFFER_SIZE = 64 * 1024;
|
||||||
|
|
||||||
|
private boolean verifyChecksum;
|
||||||
|
private String filesGroup;
|
||||||
|
private String filesUser;
|
||||||
|
private short filesMode;
|
||||||
|
|
||||||
|
private FileSystem outputFs;
|
||||||
|
private Path outputArchive;
|
||||||
|
private Path outputRoot;
|
||||||
|
|
||||||
|
private FileSystem inputFs;
|
||||||
|
private Path inputArchive;
|
||||||
|
private Path inputRoot;
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public void setup(Context context) {
|
||||||
|
Configuration conf = context.getConfiguration();
|
||||||
|
verifyChecksum = conf.getBoolean(CONF_CHECKSUM_VERIFY, true);
|
||||||
|
|
||||||
|
filesGroup = conf.get(CONF_FILES_GROUP);
|
||||||
|
filesUser = conf.get(CONF_FILES_USER);
|
||||||
|
filesMode = (short)conf.getInt(CONF_FILES_MODE, 0);
|
||||||
|
outputRoot = new Path(conf.get(CONF_OUTPUT_ROOT));
|
||||||
|
inputRoot = new Path(conf.get(CONF_INPUT_ROOT));
|
||||||
|
|
||||||
|
inputArchive = new Path(inputRoot, HConstants.HFILE_ARCHIVE_DIRECTORY);
|
||||||
|
outputArchive = new Path(outputRoot, HConstants.HFILE_ARCHIVE_DIRECTORY);
|
||||||
|
|
||||||
|
try {
|
||||||
|
inputFs = FileSystem.get(inputRoot.toUri(), conf);
|
||||||
|
} catch (IOException e) {
|
||||||
|
throw new RuntimeException("Could not get the input FileSystem with root=" + inputRoot, e);
|
||||||
|
}
|
||||||
|
|
||||||
|
try {
|
||||||
|
outputFs = FileSystem.get(outputRoot.toUri(), conf);
|
||||||
|
} catch (IOException e) {
|
||||||
|
throw new RuntimeException("Could not get the output FileSystem with root="+ outputRoot, e);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public void cleanup(Context context) {
|
||||||
|
if (outputFs != null) {
|
||||||
|
try {
|
||||||
|
outputFs.close();
|
||||||
|
} catch (IOException e) {
|
||||||
|
LOG.error("Error closing output FileSystem", e);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
if (inputFs != null) {
|
||||||
|
try {
|
||||||
|
inputFs.close();
|
||||||
|
} catch (IOException e) {
|
||||||
|
LOG.error("Error closing input FileSystem", e);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public void map(Text key, NullWritable value, Context context)
|
||||||
|
throws InterruptedException, IOException {
|
||||||
|
Path inputPath = new Path(key.toString());
|
||||||
|
Path outputPath = getOutputPath(inputPath);
|
||||||
|
|
||||||
|
LOG.info("copy file input=" + inputPath + " output=" + outputPath);
|
||||||
|
if (copyFile(context, inputPath, outputPath)) {
|
||||||
|
LOG.info("copy completed for input=" + inputPath + " output=" + outputPath);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Returns the location where the inputPath will be copied.
|
||||||
|
* - hfiles are encoded as hfile links hfile-region-table
|
||||||
|
* - logs are encoded as serverName/logName
|
||||||
|
*/
|
||||||
|
private Path getOutputPath(final Path inputPath) throws IOException {
|
||||||
|
Path path;
|
||||||
|
if (HFileLink.isHFileLink(inputPath)) {
|
||||||
|
String family = inputPath.getParent().getName();
|
||||||
|
String table = HFileLink.getReferencedTableName(inputPath.getName());
|
||||||
|
String region = HFileLink.getReferencedRegionName(inputPath.getName());
|
||||||
|
String hfile = HFileLink.getReferencedHFileName(inputPath.getName());
|
||||||
|
path = new Path(table, new Path(region, new Path(family, hfile)));
|
||||||
|
} else if (isHLogLinkPath(inputPath)) {
|
||||||
|
String logName = inputPath.getName();
|
||||||
|
path = new Path(new Path(outputRoot, HConstants.HREGION_OLDLOGDIR_NAME), logName);
|
||||||
|
} else {
|
||||||
|
path = inputPath;
|
||||||
|
}
|
||||||
|
return new Path(outputArchive, path);
|
||||||
|
}
|
||||||
|
|
||||||
|
private boolean copyFile(final Context context, final Path inputPath, final Path outputPath)
|
||||||
|
throws IOException {
|
||||||
|
FSDataInputStream in = openSourceFile(inputPath);
|
||||||
|
if (in == null) {
|
||||||
|
context.getCounter(Counter.MISSING_FILES).increment(1);
|
||||||
|
return false;
|
||||||
|
}
|
||||||
|
|
||||||
|
try {
|
||||||
|
// Verify if the input file exists
|
||||||
|
FileStatus inputStat = getFileStatus(inputFs, inputPath);
|
||||||
|
if (inputStat == null) return false;
|
||||||
|
|
||||||
|
// Verify if the output file exists and is the same that we want to copy
|
||||||
|
FileStatus outputStat = getFileStatus(outputFs, outputPath);
|
||||||
|
if (outputStat != null && sameFile(inputStat, outputStat)) {
|
||||||
|
LOG.info("Skip copy " + inputPath + " to " + outputPath + ", same file.");
|
||||||
|
return true;
|
||||||
|
}
|
||||||
|
|
||||||
|
context.getCounter(Counter.BYTES_EXPECTED).increment(inputStat.getLen());
|
||||||
|
|
||||||
|
// Ensure that the output folder is there and copy the file
|
||||||
|
outputFs.mkdirs(outputPath.getParent());
|
||||||
|
FSDataOutputStream out = outputFs.create(outputPath, true);
|
||||||
|
try {
|
||||||
|
if (!copyData(context, inputPath, in, outputPath, out, inputStat.getLen()))
|
||||||
|
return false;
|
||||||
|
} finally {
|
||||||
|
out.close();
|
||||||
|
}
|
||||||
|
|
||||||
|
// Preserve attributes
|
||||||
|
return preserveAttributes(outputPath, inputStat);
|
||||||
|
} finally {
|
||||||
|
in.close();
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Preserve the files attribute selected by the user copying them from the source file
|
||||||
|
*/
|
||||||
|
private boolean preserveAttributes(final Path path, final FileStatus refStat) {
|
||||||
|
FileStatus stat;
|
||||||
|
try {
|
||||||
|
stat = outputFs.getFileStatus(path);
|
||||||
|
} catch (IOException e) {
|
||||||
|
LOG.warn("Unable to get the status for file=" + path);
|
||||||
|
return false;
|
||||||
|
}
|
||||||
|
|
||||||
|
try {
|
||||||
|
if (filesMode > 0 && stat.getPermission().toShort() != filesMode) {
|
||||||
|
outputFs.setPermission(path, new FsPermission(filesMode));
|
||||||
|
} else if (!stat.getPermission().equals(refStat.getPermission())) {
|
||||||
|
outputFs.setPermission(path, refStat.getPermission());
|
||||||
|
}
|
||||||
|
} catch (IOException e) {
|
||||||
|
LOG.error("Unable to set the permission for file=" + path, e);
|
||||||
|
return false;
|
||||||
|
}
|
||||||
|
|
||||||
|
try {
|
||||||
|
String user = (filesUser != null) ? filesUser : refStat.getOwner();
|
||||||
|
String group = (filesGroup != null) ? filesGroup : refStat.getGroup();
|
||||||
|
if (!(user.equals(stat.getOwner()) && group.equals(stat.getGroup()))) {
|
||||||
|
outputFs.setOwner(path, user, group);
|
||||||
|
}
|
||||||
|
} catch (IOException e) {
|
||||||
|
LOG.error("Unable to set the owner/group for file=" + path, e);
|
||||||
|
return false;
|
||||||
|
}
|
||||||
|
|
||||||
|
return true;
|
||||||
|
}
|
||||||
|
|
||||||
|
private boolean copyData(final Context context,
|
||||||
|
final Path inputPath, final FSDataInputStream in,
|
||||||
|
final Path outputPath, final FSDataOutputStream out,
|
||||||
|
final long inputFileSize) {
|
||||||
|
final String statusMessage = "copied %s/" + StringUtils.humanReadableInt(inputFileSize) +
|
||||||
|
" (%.3f%%) from " + inputPath + " to " + outputPath;
|
||||||
|
|
||||||
|
try {
|
||||||
|
byte[] buffer = new byte[BUFFER_SIZE];
|
||||||
|
long totalBytesWritten = 0;
|
||||||
|
int reportBytes = 0;
|
||||||
|
int bytesRead;
|
||||||
|
|
||||||
|
while ((bytesRead = in.read(buffer)) > 0) {
|
||||||
|
out.write(buffer, 0, bytesRead);
|
||||||
|
totalBytesWritten += bytesRead;
|
||||||
|
reportBytes += bytesRead;
|
||||||
|
|
||||||
|
if (reportBytes >= REPORT_SIZE) {
|
||||||
|
context.getCounter(Counter.BYTES_COPIED).increment(reportBytes);
|
||||||
|
context.setStatus(String.format(statusMessage,
|
||||||
|
StringUtils.humanReadableInt(totalBytesWritten),
|
||||||
|
reportBytes/(float)inputFileSize));
|
||||||
|
reportBytes = 0;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
context.getCounter(Counter.BYTES_COPIED).increment(reportBytes);
|
||||||
|
context.setStatus(String.format(statusMessage,
|
||||||
|
StringUtils.humanReadableInt(totalBytesWritten),
|
||||||
|
reportBytes/(float)inputFileSize));
|
||||||
|
|
||||||
|
// Verify that the written size match
|
||||||
|
if (totalBytesWritten != inputFileSize) {
|
||||||
|
LOG.error("number of bytes copied not matching copied=" + totalBytesWritten +
|
||||||
|
" expected=" + inputFileSize + " for file=" + inputPath);
|
||||||
|
context.getCounter(Counter.COPY_FAILED).increment(1);
|
||||||
|
return false;
|
||||||
|
}
|
||||||
|
|
||||||
|
return true;
|
||||||
|
} catch (IOException e) {
|
||||||
|
LOG.error("Error copying " + inputPath + " to " + outputPath, e);
|
||||||
|
context.getCounter(Counter.COPY_FAILED).increment(1);
|
||||||
|
return false;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
private FSDataInputStream openSourceFile(final Path path) {
|
||||||
|
try {
|
||||||
|
if (HFileLink.isHFileLink(path)) {
|
||||||
|
return new HFileLink(inputRoot, inputArchive, path).open(inputFs);
|
||||||
|
} else if (isHLogLinkPath(path)) {
|
||||||
|
String serverName = path.getParent().getName();
|
||||||
|
String logName = path.getName();
|
||||||
|
return new HLogLink(inputRoot, serverName, logName).open(inputFs);
|
||||||
|
}
|
||||||
|
return inputFs.open(path);
|
||||||
|
} catch (IOException e) {
|
||||||
|
LOG.error("Unable to open source file=" + path, e);
|
||||||
|
return null;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
private FileStatus getFileStatus(final FileSystem fs, final Path path) {
|
||||||
|
try {
|
||||||
|
if (HFileLink.isHFileLink(path)) {
|
||||||
|
Path refPath = HFileLink.getReferencedPath(fs, inputRoot, inputArchive, path);
|
||||||
|
return fs.getFileStatus(refPath);
|
||||||
|
} else if (isHLogLinkPath(path)) {
|
||||||
|
String serverName = path.getParent().getName();
|
||||||
|
String logName = path.getName();
|
||||||
|
return new HLogLink(inputRoot, serverName, logName).getFileStatus(fs);
|
||||||
|
}
|
||||||
|
return fs.getFileStatus(path);
|
||||||
|
} catch (IOException e) {
|
||||||
|
LOG.warn("Unable to get the status for file=" + path);
|
||||||
|
return null;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
private FileChecksum getFileChecksum(final FileSystem fs, final Path path) {
|
||||||
|
try {
|
||||||
|
return fs.getFileChecksum(path);
|
||||||
|
} catch (IOException e) {
|
||||||
|
LOG.warn("Unable to get checksum for file=" + path, e);
|
||||||
|
return null;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Check if the two files are equal by looking at the file length,
|
||||||
|
* and at the checksum (if user has specified the verifyChecksum flag).
|
||||||
|
*/
|
||||||
|
private boolean sameFile(final FileStatus inputStat, final FileStatus outputStat) {
|
||||||
|
// Not matching length
|
||||||
|
if (inputStat.getLen() != outputStat.getLen()) return false;
|
||||||
|
|
||||||
|
// Mark files as equals, since user asked for no checksum verification
|
||||||
|
if (!verifyChecksum) return true;
|
||||||
|
|
||||||
|
// If checksums are not available, files are not the same.
|
||||||
|
FileChecksum inChecksum = getFileChecksum(inputFs, inputStat.getPath());
|
||||||
|
if (inChecksum == null) return false;
|
||||||
|
|
||||||
|
FileChecksum outChecksum = getFileChecksum(outputFs, outputStat.getPath());
|
||||||
|
if (outChecksum == null) return false;
|
||||||
|
|
||||||
|
return inChecksum.equals(outChecksum);
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* HLog files are encoded as serverName/logName
|
||||||
|
* and since all the other files should be in /hbase/table/..path..
|
||||||
|
* we can rely on the depth, for now.
|
||||||
|
*/
|
||||||
|
private static boolean isHLogLinkPath(final Path path) {
|
||||||
|
return path.depth() == 2;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Extract the list of files (HFiles/HLogs) to copy using Map-Reduce.
|
||||||
|
* @return list of files referenced by the snapshot (pair of path and size)
|
||||||
|
*/
|
||||||
|
private List<Pair<Path, Long>> getSnapshotFiles(final FileSystem fs, final Path snapshotDir) throws IOException {
|
||||||
|
SnapshotDescription snapshotDesc = SnapshotDescriptionUtils.readSnapshotInfo(fs, snapshotDir);
|
||||||
|
|
||||||
|
final List<Pair<Path, Long>> files = new ArrayList<Pair<Path, Long>>();
|
||||||
|
final String table = snapshotDesc.getTable();
|
||||||
|
final Configuration conf = getConf();
|
||||||
|
|
||||||
|
// Get snapshot files
|
||||||
|
SnapshotReferenceUtil.visitReferencedFiles(fs, snapshotDir,
|
||||||
|
new SnapshotReferenceUtil.FileVisitor() {
|
||||||
|
public void storeFile (final String region, final String family, final String hfile)
|
||||||
|
throws IOException {
|
||||||
|
Path path = new Path(family, HFileLink.createHFileLinkName(table, region, hfile));
|
||||||
|
long size = fs.getFileStatus(HFileLink.getReferencedPath(conf, fs, path)).getLen();
|
||||||
|
files.add(new Pair<Path, Long>(path, size));
|
||||||
|
}
|
||||||
|
|
||||||
|
public void recoveredEdits (final String region, final String logfile)
|
||||||
|
throws IOException {
|
||||||
|
// copied with the snapshot referenecs
|
||||||
|
}
|
||||||
|
|
||||||
|
public void logFile (final String server, final String logfile)
|
||||||
|
throws IOException {
|
||||||
|
long size = new HLogLink(conf, server, logfile).getFileStatus(fs).getLen();
|
||||||
|
files.add(new Pair<Path, Long>(new Path(server, logfile), size));
|
||||||
|
}
|
||||||
|
});
|
||||||
|
|
||||||
|
return files;
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Given a list of file paths and sizes, create around ngroups in as balanced a way as possible.
|
||||||
|
* The groups created will have similar amounts of bytes.
|
||||||
|
* <p>
|
||||||
|
* The algorithm used is pretty straightforward; the file list is sorted by size,
|
||||||
|
* and then each group fetch the bigger file available, iterating through groups
|
||||||
|
* alternating the direction.
|
||||||
|
*/
|
||||||
|
static List<List<Path>> getBalancedSplits(final List<Pair<Path, Long>> files, int ngroups) {
|
||||||
|
// Sort files by size, from small to big
|
||||||
|
Collections.sort(files, new Comparator<Pair<Path, Long>>() {
|
||||||
|
public int compare(Pair<Path, Long> a, Pair<Path, Long> b) {
|
||||||
|
long r = a.getSecond() - b.getSecond();
|
||||||
|
return (r < 0) ? -1 : ((r > 0) ? 1 : 0);
|
||||||
|
}
|
||||||
|
});
|
||||||
|
|
||||||
|
// create balanced groups
|
||||||
|
List<List<Path>> fileGroups = new LinkedList<List<Path>>();
|
||||||
|
long[] sizeGroups = new long[ngroups];
|
||||||
|
int hi = files.size() - 1;
|
||||||
|
int lo = 0;
|
||||||
|
|
||||||
|
List<Path> group;
|
||||||
|
int dir = 1;
|
||||||
|
int g = 0;
|
||||||
|
|
||||||
|
while (hi >= lo) {
|
||||||
|
if (g == fileGroups.size()) {
|
||||||
|
group = new LinkedList<Path>();
|
||||||
|
fileGroups.add(group);
|
||||||
|
} else {
|
||||||
|
group = fileGroups.get(g);
|
||||||
|
}
|
||||||
|
|
||||||
|
Pair<Path, Long> fileInfo = files.get(hi--);
|
||||||
|
|
||||||
|
// add the hi one
|
||||||
|
sizeGroups[g] += fileInfo.getSecond();
|
||||||
|
group.add(fileInfo.getFirst());
|
||||||
|
|
||||||
|
// change direction when at the end or the beginning
|
||||||
|
g += dir;
|
||||||
|
if (g == ngroups) {
|
||||||
|
dir = -1;
|
||||||
|
g = ngroups - 1;
|
||||||
|
} else if (g < 0) {
|
||||||
|
dir = 1;
|
||||||
|
g = 0;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
if (LOG.isDebugEnabled()) {
|
||||||
|
for (int i = 0; i < sizeGroups.length; ++i) {
|
||||||
|
LOG.debug("export split=" + i + " size=" + StringUtils.humanReadableInt(sizeGroups[i]));
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
return fileGroups;
|
||||||
|
}
|
||||||
|
|
||||||
|
private static Path getInputFolderPath(Configuration conf)
|
||||||
|
throws IOException, InterruptedException {
|
||||||
|
Path stagingDir = JobUtil.getStagingDir(conf);
|
||||||
|
return new Path(stagingDir, INPUT_FOLDER_PREFIX +
|
||||||
|
String.valueOf(EnvironmentEdgeManager.currentTimeMillis()));
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Create the input files, with the path to copy, for the MR job.
|
||||||
|
* Each input files contains n files, and each input file has a similar amount data to copy.
|
||||||
|
* The number of input files created are based on the number of mappers provided as argument
|
||||||
|
* and the number of the files to copy.
|
||||||
|
*/
|
||||||
|
private static Path[] createInputFiles(final Configuration conf,
|
||||||
|
final List<Pair<Path, Long>> snapshotFiles, int mappers)
|
||||||
|
throws IOException, InterruptedException {
|
||||||
|
Path inputFolderPath = getInputFolderPath(conf);
|
||||||
|
FileSystem fs = inputFolderPath.getFileSystem(conf);
|
||||||
|
LOG.debug("Input folder location: " + inputFolderPath);
|
||||||
|
|
||||||
|
List<List<Path>> splits = getBalancedSplits(snapshotFiles, mappers);
|
||||||
|
Path[] inputFiles = new Path[splits.size()];
|
||||||
|
|
||||||
|
Text key = new Text();
|
||||||
|
for (int i = 0; i < inputFiles.length; i++) {
|
||||||
|
List<Path> files = splits.get(i);
|
||||||
|
inputFiles[i] = new Path(inputFolderPath, String.format("export-%d.seq", i));
|
||||||
|
SequenceFile.Writer writer = SequenceFile.createWriter(fs, conf, inputFiles[i],
|
||||||
|
Text.class, NullWritable.class);
|
||||||
|
LOG.debug("Input split: " + i);
|
||||||
|
try {
|
||||||
|
for (Path file: files) {
|
||||||
|
LOG.debug(file.toString());
|
||||||
|
key.set(file.toString());
|
||||||
|
writer.append(key, NullWritable.get());
|
||||||
|
}
|
||||||
|
} finally {
|
||||||
|
writer.close();
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
return inputFiles;
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Run Map-Reduce Job to perform the files copy.
|
||||||
|
*/
|
||||||
|
private boolean runCopyJob(final Path inputRoot, final Path outputRoot,
|
||||||
|
final List<Pair<Path, Long>> snapshotFiles, final boolean verifyChecksum,
|
||||||
|
final String filesUser, final String filesGroup, final int filesMode,
|
||||||
|
final int mappers) throws IOException, InterruptedException, ClassNotFoundException {
|
||||||
|
Configuration conf = getConf();
|
||||||
|
if (filesGroup != null) conf.set(CONF_FILES_GROUP, filesGroup);
|
||||||
|
if (filesUser != null) conf.set(CONF_FILES_USER, filesUser);
|
||||||
|
conf.setInt(CONF_FILES_MODE, filesMode);
|
||||||
|
conf.setBoolean(CONF_CHECKSUM_VERIFY, verifyChecksum);
|
||||||
|
conf.set(CONF_OUTPUT_ROOT, outputRoot.toString());
|
||||||
|
conf.set(CONF_INPUT_ROOT, inputRoot.toString());
|
||||||
|
conf.setInt("mapreduce.job.maps", mappers);
|
||||||
|
|
||||||
|
Job job = new Job(conf);
|
||||||
|
job.setJobName("ExportSnapshot");
|
||||||
|
job.setJarByClass(ExportSnapshot.class);
|
||||||
|
job.setMapperClass(ExportMapper.class);
|
||||||
|
job.setInputFormatClass(SequenceFileInputFormat.class);
|
||||||
|
job.setOutputFormatClass(NullOutputFormat.class);
|
||||||
|
job.setMapSpeculativeExecution(false);
|
||||||
|
job.setNumReduceTasks(0);
|
||||||
|
for (Path path: createInputFiles(conf, snapshotFiles, mappers)) {
|
||||||
|
LOG.debug("Add Input Path=" + path);
|
||||||
|
SequenceFileInputFormat.addInputPath(job, path);
|
||||||
|
}
|
||||||
|
|
||||||
|
return job.waitForCompletion(true);
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Execute the export snapshot by copying the snapshot metadata, hfiles and hlogs.
|
||||||
|
* @return 0 on success, and != 0 upon failure.
|
||||||
|
*/
|
||||||
|
@Override
|
||||||
|
public int run(String[] args) throws Exception {
|
||||||
|
boolean verifyChecksum = true;
|
||||||
|
String snapshotName = null;
|
||||||
|
String filesGroup = null;
|
||||||
|
String filesUser = null;
|
||||||
|
Path outputRoot = null;
|
||||||
|
int filesMode = 0;
|
||||||
|
int mappers = getConf().getInt("mapreduce.job.maps", 1);
|
||||||
|
|
||||||
|
// Process command line args
|
||||||
|
for (int i = 0; i < args.length; i++) {
|
||||||
|
String cmd = args[i];
|
||||||
|
try {
|
||||||
|
if (cmd.equals("-snapshot")) {
|
||||||
|
snapshotName = args[++i];
|
||||||
|
} else if (cmd.equals("-copy-to")) {
|
||||||
|
outputRoot = new Path(args[++i]);
|
||||||
|
} else if (cmd.equals("-no-checksum-verify")) {
|
||||||
|
verifyChecksum = false;
|
||||||
|
} else if (cmd.equals("-mappers")) {
|
||||||
|
mappers = Integer.parseInt(args[++i]);
|
||||||
|
} else if (cmd.equals("-chuser")) {
|
||||||
|
filesUser = args[++i];
|
||||||
|
} else if (cmd.equals("-chgroup")) {
|
||||||
|
filesGroup = args[++i];
|
||||||
|
} else if (cmd.equals("-chmod")) {
|
||||||
|
filesMode = Integer.parseInt(args[++i], 8);
|
||||||
|
} else if (cmd.equals("-h") || cmd.equals("--help")) {
|
||||||
|
printUsageAndExit();
|
||||||
|
} else {
|
||||||
|
System.err.println("UNEXPECTED: " + cmd);
|
||||||
|
printUsageAndExit();
|
||||||
|
}
|
||||||
|
} catch (Exception e) {
|
||||||
|
printUsageAndExit();
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// Check user options
|
||||||
|
if (snapshotName == null) {
|
||||||
|
System.err.println("Snapshot name not provided.");
|
||||||
|
printUsageAndExit();
|
||||||
|
}
|
||||||
|
|
||||||
|
if (outputRoot == null) {
|
||||||
|
System.err.println("Destination file-system not provided.");
|
||||||
|
printUsageAndExit();
|
||||||
|
}
|
||||||
|
|
||||||
|
Configuration conf = getConf();
|
||||||
|
Path inputRoot = FSUtils.getRootDir(conf);
|
||||||
|
FileSystem inputFs = FileSystem.get(conf);
|
||||||
|
FileSystem outputFs = FileSystem.get(outputRoot.toUri(), new Configuration());
|
||||||
|
|
||||||
|
Path snapshotDir = SnapshotDescriptionUtils.getCompletedSnapshotDir(snapshotName, inputRoot);
|
||||||
|
Path snapshotTmpDir = SnapshotDescriptionUtils.getWorkingSnapshotDir(snapshotName, outputRoot);
|
||||||
|
Path outputSnapshotDir = SnapshotDescriptionUtils.getCompletedSnapshotDir(snapshotName, outputRoot);
|
||||||
|
|
||||||
|
// Check if the snapshot already exists
|
||||||
|
if (outputFs.exists(outputSnapshotDir)) {
|
||||||
|
System.err.println("The snapshot '" + snapshotName +
|
||||||
|
"' already exists in the destination: " + outputSnapshotDir);
|
||||||
|
return 1;
|
||||||
|
}
|
||||||
|
|
||||||
|
// Check if the snapshot already in-progress
|
||||||
|
if (outputFs.exists(snapshotTmpDir)) {
|
||||||
|
System.err.println("A snapshot with the same name '" + snapshotName + "' is in-progress");
|
||||||
|
return 1;
|
||||||
|
}
|
||||||
|
|
||||||
|
// Step 0 - Extract snapshot files to copy
|
||||||
|
final List<Pair<Path, Long>> files = getSnapshotFiles(inputFs, snapshotDir);
|
||||||
|
|
||||||
|
// Step 1 - Copy fs1:/.snapshot/<snapshot> to fs2:/.snapshot/.tmp/<snapshot>
|
||||||
|
// The snapshot references must be copied before the hfiles otherwise the cleaner
|
||||||
|
// will remove them because they are unreferenced.
|
||||||
|
try {
|
||||||
|
FileUtil.copy(inputFs, snapshotDir, outputFs, snapshotTmpDir, false, false, conf);
|
||||||
|
} catch (IOException e) {
|
||||||
|
System.err.println("Failed to copy the snapshot directory: from=" + snapshotDir +
|
||||||
|
" to=" + snapshotTmpDir);
|
||||||
|
e.printStackTrace(System.err);
|
||||||
|
return 1;
|
||||||
|
}
|
||||||
|
|
||||||
|
// Step 2 - Start MR Job to copy files
|
||||||
|
// The snapshot references must be copied before the files otherwise the files gets removed
|
||||||
|
// by the HFileArchiver, since they have no references.
|
||||||
|
try {
|
||||||
|
if (!runCopyJob(inputRoot, outputRoot, files, verifyChecksum,
|
||||||
|
filesUser, filesGroup, filesMode, mappers)) {
|
||||||
|
throw new ExportSnapshotException("Snapshot export failed!");
|
||||||
|
}
|
||||||
|
|
||||||
|
// Step 3 - Rename fs2:/.snapshot/.tmp/<snapshot> fs2:/.snapshot/<snapshot>
|
||||||
|
if (!outputFs.rename(snapshotTmpDir, outputSnapshotDir)) {
|
||||||
|
System.err.println("Snapshot export failed!");
|
||||||
|
System.err.println("Unable to rename snapshot directory from=" +
|
||||||
|
snapshotTmpDir + " to=" + outputSnapshotDir);
|
||||||
|
return 1;
|
||||||
|
}
|
||||||
|
|
||||||
|
return 0;
|
||||||
|
} catch (Exception e) {
|
||||||
|
System.err.println("Snapshot export failed!");
|
||||||
|
e.printStackTrace(System.err);
|
||||||
|
outputFs.delete(outputSnapshotDir, true);
|
||||||
|
return 1;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// ExportSnapshot
|
||||||
|
private void printUsageAndExit() {
|
||||||
|
System.err.printf("Usage: bin/hbase %s [options]\n", getClass().getName());
|
||||||
|
System.err.println(" where [options] are:");
|
||||||
|
System.err.println(" -h|-help Show this help and exit.");
|
||||||
|
System.err.println(" -snapshot NAME Snapshot to restore.");
|
||||||
|
System.err.println(" -copy-to NAME Remote destination hdfs://");
|
||||||
|
System.err.println(" -no-checksum-verify Do not verify checksum.");
|
||||||
|
System.err.println(" -chuser USERNAME Change the owner of the files to the specified one.");
|
||||||
|
System.err.println(" -chgroup GROUP Change the group of the files to the specified one.");
|
||||||
|
System.err.println(" -chmod MODE Change the permission of the files to the specified one.");
|
||||||
|
System.err.println(" -mappers Number of mappers to use during the copy (mapreduce.job.maps).");
|
||||||
|
System.err.println();
|
||||||
|
System.err.println("Examples:");
|
||||||
|
System.err.println(" hbase " + getClass() + " \\");
|
||||||
|
System.err.println(" -snapshot MySnapshot -copy-to hdfs:///srv2:8082/hbase \\");
|
||||||
|
System.err.println(" -chuser MyUser -chgroup MyGroup -chmod 700 -mappers 16");
|
||||||
|
System.exit(1);
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* The guts of the {@link #main} method.
|
||||||
|
* Call this method to avoid the {@link #main(String[])} System.exit.
|
||||||
|
* @param args
|
||||||
|
* @return errCode
|
||||||
|
* @throws Exception
|
||||||
|
*/
|
||||||
|
static int innerMain(final Configuration conf, final String [] args) throws Exception {
|
||||||
|
return ToolRunner.run(conf, new ExportSnapshot(), args);
|
||||||
|
}
|
||||||
|
|
||||||
|
public static void main(String[] args) throws Exception {
|
||||||
|
System.exit(innerMain(HBaseConfiguration.create(), args));
|
||||||
|
}
|
||||||
|
}
|
|
@ -0,0 +1,254 @@
|
||||||
|
/**
|
||||||
|
* Licensed to the Apache Software Foundation (ASF) under one
|
||||||
|
* or more contributor license agreements. See the NOTICE file
|
||||||
|
* distributed with this work for additional information
|
||||||
|
* regarding copyright ownership. The ASF licenses this file
|
||||||
|
* to you under the Apache License, Version 2.0 (the
|
||||||
|
* "License"); you may not use this file except in compliance
|
||||||
|
* with the License. You may obtain a copy of the License at
|
||||||
|
*
|
||||||
|
* http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
*
|
||||||
|
* Unless required by applicable law or agreed to in writing, software
|
||||||
|
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||||
|
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||||
|
* See the License for the specific language governing permissions and
|
||||||
|
* limitations under the License.
|
||||||
|
*/
|
||||||
|
|
||||||
|
package org.apache.hadoop.hbase.snapshot.tool;
|
||||||
|
|
||||||
|
import static org.junit.Assert.assertEquals;
|
||||||
|
import static org.junit.Assert.assertFalse;
|
||||||
|
import static org.junit.Assert.assertTrue;
|
||||||
|
|
||||||
|
import java.io.IOException;
|
||||||
|
import java.net.URI;
|
||||||
|
import java.util.ArrayList;
|
||||||
|
import java.util.Arrays;
|
||||||
|
import java.util.List;
|
||||||
|
import java.util.HashSet;
|
||||||
|
import java.util.Set;
|
||||||
|
|
||||||
|
import org.apache.commons.logging.Log;
|
||||||
|
import org.apache.commons.logging.LogFactory;
|
||||||
|
|
||||||
|
import org.apache.hadoop.conf.Configuration;
|
||||||
|
import org.apache.hadoop.fs.FileSystem;
|
||||||
|
import org.apache.hadoop.fs.FileStatus;
|
||||||
|
import org.apache.hadoop.fs.Path;
|
||||||
|
import org.apache.hadoop.hbase.HBaseTestingUtility;
|
||||||
|
import org.apache.hadoop.hbase.HColumnDescriptor;
|
||||||
|
import org.apache.hadoop.hbase.HTableDescriptor;
|
||||||
|
import org.apache.hadoop.hbase.KeyValue;
|
||||||
|
import org.apache.hadoop.hbase.MediumTests;
|
||||||
|
import org.apache.hadoop.hbase.MiniHBaseCluster;
|
||||||
|
import org.apache.hadoop.hbase.client.HBaseAdmin;
|
||||||
|
import org.apache.hadoop.hbase.client.HTable;
|
||||||
|
import org.apache.hadoop.hbase.util.Bytes;
|
||||||
|
import org.apache.hadoop.hbase.util.FSUtils;
|
||||||
|
import org.apache.hadoop.hbase.util.Pair;
|
||||||
|
import org.apache.hadoop.hbase.snapshot.SnapshotReferenceUtil;
|
||||||
|
import org.apache.hadoop.mapreduce.Job;
|
||||||
|
import org.junit.After;
|
||||||
|
import org.junit.AfterClass;
|
||||||
|
import org.junit.Before;
|
||||||
|
import org.junit.BeforeClass;
|
||||||
|
import org.junit.Test;
|
||||||
|
import org.junit.experimental.categories.Category;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Test Export Snapshot Tool
|
||||||
|
*/
|
||||||
|
@Category(MediumTests.class)
|
||||||
|
public class TestExportSnapshot {
|
||||||
|
private final Log LOG = LogFactory.getLog(getClass());
|
||||||
|
|
||||||
|
private final static HBaseTestingUtility TEST_UTIL = new HBaseTestingUtility();
|
||||||
|
|
||||||
|
private final static byte[] FAMILY = Bytes.toBytes("cf");
|
||||||
|
|
||||||
|
private byte[] snapshotName;
|
||||||
|
private byte[] tableName;
|
||||||
|
private HBaseAdmin admin;
|
||||||
|
|
||||||
|
@BeforeClass
|
||||||
|
public static void setUpBeforeClass() throws Exception {
|
||||||
|
TEST_UTIL.getConfiguration().setInt("hbase.regionserver.msginterval", 100);
|
||||||
|
TEST_UTIL.getConfiguration().setInt("hbase.client.pause", 250);
|
||||||
|
TEST_UTIL.getConfiguration().setInt("hbase.client.retries.number", 6);
|
||||||
|
TEST_UTIL.getConfiguration().setBoolean("hbase.master.enabletable.roundrobin", true);
|
||||||
|
TEST_UTIL.startMiniCluster(3);
|
||||||
|
}
|
||||||
|
|
||||||
|
@AfterClass
|
||||||
|
public static void tearDownAfterClass() throws Exception {
|
||||||
|
TEST_UTIL.shutdownMiniCluster();
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Create a table and take a snapshot of the table used by the export test.
|
||||||
|
*/
|
||||||
|
@Before
|
||||||
|
public void setUp() throws Exception {
|
||||||
|
this.admin = TEST_UTIL.getHBaseAdmin();
|
||||||
|
|
||||||
|
long tid = System.currentTimeMillis();
|
||||||
|
tableName = Bytes.toBytes("testtb-" + tid);
|
||||||
|
snapshotName = Bytes.toBytes("snaptb0-" + tid);
|
||||||
|
|
||||||
|
// create Table
|
||||||
|
HTableDescriptor htd = new HTableDescriptor(tableName);
|
||||||
|
htd.addFamily(new HColumnDescriptor(FAMILY));
|
||||||
|
admin.createTable(htd, null);
|
||||||
|
HTable table = new HTable(TEST_UTIL.getConfiguration(), tableName);
|
||||||
|
TEST_UTIL.loadTable(table, FAMILY);
|
||||||
|
|
||||||
|
// take a snapshot
|
||||||
|
admin.disableTable(tableName);
|
||||||
|
admin.snapshot(snapshotName, tableName);
|
||||||
|
admin.enableTable(tableName);
|
||||||
|
}
|
||||||
|
|
||||||
|
@After
|
||||||
|
public void tearDown() throws Exception {
|
||||||
|
this.admin.close();
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Verfy the result of getBalanceSplits() method.
|
||||||
|
* The result are groups of files, used as input list for the "export" mappers.
|
||||||
|
* All the groups should have similar amount of data.
|
||||||
|
*
|
||||||
|
* The input list is a pair of file path and length.
|
||||||
|
* The getBalanceSplits() function sort it by length,
|
||||||
|
* and assign to each group a file, going back and forth through the groups.
|
||||||
|
*/
|
||||||
|
@Test
|
||||||
|
public void testBalanceSplit() throws Exception {
|
||||||
|
// Create a list of files
|
||||||
|
List<Pair<Path, Long>> files = new ArrayList<Pair<Path, Long>>();
|
||||||
|
for (long i = 0; i <= 20; i++) {
|
||||||
|
files.add(new Pair<Path, Long>(new Path("file-" + i), i));
|
||||||
|
}
|
||||||
|
|
||||||
|
// Create 5 groups (total size 210)
|
||||||
|
// group 0: 20, 11, 10, 1 (total size: 42)
|
||||||
|
// group 1: 19, 12, 9, 2 (total size: 42)
|
||||||
|
// group 2: 18, 13, 8, 3 (total size: 42)
|
||||||
|
// group 3: 17, 12, 7, 4 (total size: 42)
|
||||||
|
// group 4: 16, 11, 6, 5 (total size: 42)
|
||||||
|
List<List<Path>> splits = ExportSnapshot.getBalancedSplits(files, 5);
|
||||||
|
assertEquals(5, splits.size());
|
||||||
|
assertEquals(Arrays.asList(new Path("file-20"), new Path("file-11"),
|
||||||
|
new Path("file-10"), new Path("file-1"), new Path("file-0")), splits.get(0));
|
||||||
|
assertEquals(Arrays.asList(new Path("file-19"), new Path("file-12"),
|
||||||
|
new Path("file-9"), new Path("file-2")), splits.get(1));
|
||||||
|
assertEquals(Arrays.asList(new Path("file-18"), new Path("file-13"),
|
||||||
|
new Path("file-8"), new Path("file-3")), splits.get(2));
|
||||||
|
assertEquals(Arrays.asList(new Path("file-17"), new Path("file-14"),
|
||||||
|
new Path("file-7"), new Path("file-4")), splits.get(3));
|
||||||
|
assertEquals(Arrays.asList(new Path("file-16"), new Path("file-15"),
|
||||||
|
new Path("file-6"), new Path("file-5")), splits.get(4));
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Verify if exported snapshot and copied files matches the original one.
|
||||||
|
*/
|
||||||
|
@Test
|
||||||
|
public void testExportFileSystemState() throws Exception {
|
||||||
|
Path copyDir = TEST_UTIL.getDataTestDir("export-" + System.currentTimeMillis());
|
||||||
|
URI hdfsUri = FileSystem.get(TEST_UTIL.getConfiguration()).getUri();
|
||||||
|
FileSystem fs = FileSystem.get(copyDir.toUri(), new Configuration());
|
||||||
|
copyDir = copyDir.makeQualified(fs);
|
||||||
|
|
||||||
|
// Export Snapshot
|
||||||
|
int res = ExportSnapshot.innerMain(TEST_UTIL.getConfiguration(), new String[] {
|
||||||
|
"-snapshot", Bytes.toString(snapshotName),
|
||||||
|
"-copy-to", copyDir.toString()
|
||||||
|
});
|
||||||
|
assertEquals(0, res);
|
||||||
|
|
||||||
|
// Verify File-System state
|
||||||
|
FileStatus[] rootFiles = fs.listStatus(copyDir);
|
||||||
|
assertEquals(2, rootFiles.length);
|
||||||
|
for (FileStatus fileStatus: rootFiles) {
|
||||||
|
String name = fileStatus.getPath().getName();
|
||||||
|
assertTrue(fileStatus.isDir());
|
||||||
|
assertTrue(name.equals(".snapshot") || name.equals(".archive"));
|
||||||
|
}
|
||||||
|
|
||||||
|
// compare the snapshot metadata and verify the hfiles
|
||||||
|
final FileSystem hdfs = FileSystem.get(hdfsUri, TEST_UTIL.getConfiguration());
|
||||||
|
final Path snapshotDir = new Path(".snapshot", Bytes.toString(snapshotName));
|
||||||
|
verifySnapshot(hdfs, new Path(TEST_UTIL.getDefaultRootDirPath(), snapshotDir),
|
||||||
|
fs, new Path(copyDir, snapshotDir));
|
||||||
|
verifyArchive(fs, copyDir, Bytes.toString(snapshotName));
|
||||||
|
|
||||||
|
// Remove the exported dir
|
||||||
|
fs.delete(copyDir, true);
|
||||||
|
}
|
||||||
|
|
||||||
|
/*
|
||||||
|
* verify if the snapshot folder on file-system 1 match the one on file-system 2
|
||||||
|
*/
|
||||||
|
private void verifySnapshot(final FileSystem fs1, final Path root1,
|
||||||
|
final FileSystem fs2, final Path root2) throws IOException {
|
||||||
|
Set<String> s = new HashSet<String>();
|
||||||
|
assertEquals(listFiles(fs1, root1, root1), listFiles(fs2, root2, root2));
|
||||||
|
}
|
||||||
|
|
||||||
|
/*
|
||||||
|
* Verify if the files exists
|
||||||
|
*/
|
||||||
|
private void verifyArchive(final FileSystem fs, final Path rootDir, final String snapshotName)
|
||||||
|
throws IOException {
|
||||||
|
final Path exportedSnapshot = new Path(rootDir, new Path(".snapshot", snapshotName));
|
||||||
|
final Path exportedArchive = new Path(rootDir, ".archive");
|
||||||
|
LOG.debug(listFiles(fs, exportedArchive, exportedArchive));
|
||||||
|
SnapshotReferenceUtil.visitReferencedFiles(fs, exportedSnapshot,
|
||||||
|
new SnapshotReferenceUtil.FileVisitor() {
|
||||||
|
public void storeFile (final String region, final String family, final String hfile)
|
||||||
|
throws IOException {
|
||||||
|
verifyNonEmptyFile(new Path(exportedArchive,
|
||||||
|
new Path(Bytes.toString(tableName), new Path(region, new Path(family, hfile)))));
|
||||||
|
}
|
||||||
|
|
||||||
|
public void recoveredEdits (final String region, final String logfile)
|
||||||
|
throws IOException {
|
||||||
|
verifyNonEmptyFile(new Path(exportedSnapshot,
|
||||||
|
new Path(Bytes.toString(tableName), new Path(region, logfile))));
|
||||||
|
}
|
||||||
|
|
||||||
|
public void logFile (final String server, final String logfile)
|
||||||
|
throws IOException {
|
||||||
|
verifyNonEmptyFile(new Path(exportedSnapshot, new Path(server, logfile)));
|
||||||
|
}
|
||||||
|
|
||||||
|
private void verifyNonEmptyFile(final Path path) throws IOException {
|
||||||
|
LOG.debug(path);
|
||||||
|
assertTrue(fs.exists(path));
|
||||||
|
assertTrue(fs.getFileStatus(path).getLen() > 0);
|
||||||
|
}
|
||||||
|
});
|
||||||
|
}
|
||||||
|
|
||||||
|
private Set<String> listFiles(final FileSystem fs, final Path root, final Path dir)
|
||||||
|
throws IOException {
|
||||||
|
Set<String> files = new HashSet<String>();
|
||||||
|
int rootPrefix = root.toString().length();
|
||||||
|
FileStatus[] list = FSUtils.listStatus(fs, dir);
|
||||||
|
if (list != null) {
|
||||||
|
for (FileStatus fstat: list) {
|
||||||
|
LOG.debug(fstat.getPath());
|
||||||
|
if (fstat.isDir()) {
|
||||||
|
files.addAll(listFiles(fs, root, fstat.getPath()));
|
||||||
|
} else {
|
||||||
|
files.add(fstat.getPath().toString().substring(rootPrefix));
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
return files;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
Loading…
Reference in New Issue