HBASE-6610 HFileLink: Hardlink alternative for snapshot restore
git-svn-id: https://svn.apache.org/repos/asf/hbase/trunk@1391326 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
parent
1a992218e3
commit
37ab60bcc5
|
@ -0,0 +1,459 @@
|
||||||
|
/**
|
||||||
|
* Licensed to the Apache Software Foundation (ASF) under one
|
||||||
|
* or more contributor license agreements. See the NOTICE file
|
||||||
|
* distributed with this work for additional information
|
||||||
|
* regarding copyright ownership. The ASF licenses this file
|
||||||
|
* to you under the Apache License, Version 2.0 (the
|
||||||
|
* "License"); you may not use this file except in compliance
|
||||||
|
* with the License. You may obtain a copy of the License at
|
||||||
|
*
|
||||||
|
* http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
*
|
||||||
|
* Unless required by applicable law or agreed to in writing, software
|
||||||
|
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||||
|
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||||
|
* See the License for the specific language governing permissions and
|
||||||
|
* limitations under the License.
|
||||||
|
*/
|
||||||
|
|
||||||
|
package org.apache.hadoop.hbase.io;
|
||||||
|
|
||||||
|
import java.util.ArrayList;
|
||||||
|
import java.util.Collection;
|
||||||
|
|
||||||
|
import java.io.IOException;
|
||||||
|
import java.io.InputStream;
|
||||||
|
import java.io.FileNotFoundException;
|
||||||
|
|
||||||
|
import org.apache.commons.logging.Log;
|
||||||
|
import org.apache.commons.logging.LogFactory;
|
||||||
|
import org.apache.hadoop.conf.Configuration;
|
||||||
|
import org.apache.hadoop.classification.InterfaceAudience;
|
||||||
|
import org.apache.hadoop.fs.FSDataInputStream;
|
||||||
|
import org.apache.hadoop.fs.FileSystem;
|
||||||
|
import org.apache.hadoop.fs.FileStatus;
|
||||||
|
import org.apache.hadoop.fs.Path;
|
||||||
|
import org.apache.hadoop.fs.PositionedReadable;
|
||||||
|
import org.apache.hadoop.fs.Seekable;
|
||||||
|
import org.apache.hadoop.hbase.HConstants;
|
||||||
|
import org.apache.hadoop.hbase.util.FSUtils;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* The FileLink is a sort of hardlink, that allows to access a file given a set of locations.
|
||||||
|
*
|
||||||
|
* <p><b>The Problem:</b>
|
||||||
|
* <ul>
|
||||||
|
* <li>
|
||||||
|
* HDFS doesn't have support for hardlinks, and this make impossible to referencing
|
||||||
|
* the same data blocks using different names.
|
||||||
|
* </li>
|
||||||
|
* <li>
|
||||||
|
* HBase store files in one location (e.g. table/region/family/) and when the file is not
|
||||||
|
* needed anymore (e.g. compaction, region deletetion, ...) moves it to an archive directory.
|
||||||
|
* </li>
|
||||||
|
* </ul>
|
||||||
|
* If we want to create a reference to a file, we need to remember that it can be in its
|
||||||
|
* original location or in the archive folder.
|
||||||
|
* The FileLink class tries to abstract this concept and given a set of locations
|
||||||
|
* it is able to switch between them making this operation transparent for the user.
|
||||||
|
* More concrete implementations of the FileLink are the {@link HFileLink} and the {@link HLogLink}.
|
||||||
|
*
|
||||||
|
* <p><b>Back-references:</b>
|
||||||
|
* To help the {@link CleanerChore} to keep track of the links to a particular file,
|
||||||
|
* during the FileLink creation, a new file is placed inside a back-reference directory.
|
||||||
|
* There's one back-reference directory for each file that has links,
|
||||||
|
* and in the directory there's one file per link.
|
||||||
|
*
|
||||||
|
* <p>HFileLink Example
|
||||||
|
* <ul>
|
||||||
|
* <li>
|
||||||
|
* /hbase/table/region-x/cf/file-k
|
||||||
|
* (Original File)
|
||||||
|
* </li>
|
||||||
|
* <li>
|
||||||
|
* /hbase/table-cloned/region-y/cf/file-k.region-x.table
|
||||||
|
* (HFileLink to the original file)
|
||||||
|
* </li>
|
||||||
|
* <li>
|
||||||
|
* /hbase/table-2nd-cloned/region-z/cf/file-k.region-x.table
|
||||||
|
* (HFileLink to the original file)
|
||||||
|
* </li>
|
||||||
|
* <li>
|
||||||
|
* /hbase/.archive/table/region-x/.links-file-k/region-y.table-cloned
|
||||||
|
* (Back-reference to the link in table-cloned)
|
||||||
|
* </li>
|
||||||
|
* <li>
|
||||||
|
* /hbase/.archive/table/region-x/.links-file-k/region-z.table-cloned
|
||||||
|
* (Back-reference to the link in table-2nd-cloned)
|
||||||
|
* </li>
|
||||||
|
* </ul>
|
||||||
|
*/
|
||||||
|
@InterfaceAudience.Private
|
||||||
|
public class FileLink {
|
||||||
|
private static final Log LOG = LogFactory.getLog(FileLink.class);
|
||||||
|
|
||||||
|
/** Define the Back-reference directory name prefix: .links-<hfile>/ */
|
||||||
|
public static final String BACK_REFERENCES_DIRECTORY_PREFIX = ".links-";
|
||||||
|
|
||||||
|
/**
|
||||||
|
* FileLink InputStream that handles the switch between the original path
|
||||||
|
* and the alternative locations, when the file is moved.
|
||||||
|
*/
|
||||||
|
private static class FileLinkInputStream extends InputStream
|
||||||
|
implements Seekable, PositionedReadable {
|
||||||
|
private FSDataInputStream in = null;
|
||||||
|
private Path currentPath = null;
|
||||||
|
private long pos = 0;
|
||||||
|
|
||||||
|
private final FileLink fileLink;
|
||||||
|
private final int bufferSize;
|
||||||
|
private final FileSystem fs;
|
||||||
|
|
||||||
|
public FileLinkInputStream(final FileSystem fs, final FileLink fileLink)
|
||||||
|
throws IOException {
|
||||||
|
this(fs, fileLink, fs.getConf().getInt("io.file.buffer.size", 4096));
|
||||||
|
}
|
||||||
|
|
||||||
|
public FileLinkInputStream(final FileSystem fs, final FileLink fileLink, int bufferSize)
|
||||||
|
throws IOException {
|
||||||
|
this.bufferSize = bufferSize;
|
||||||
|
this.fileLink = fileLink;
|
||||||
|
this.fs = fs;
|
||||||
|
|
||||||
|
this.in = tryOpen();
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public int read() throws IOException {
|
||||||
|
int res;
|
||||||
|
try {
|
||||||
|
res = in.read();
|
||||||
|
} catch (FileNotFoundException e) {
|
||||||
|
res = tryOpen().read();
|
||||||
|
} catch (NullPointerException e) { // HDFS 1.x - DFSInputStream.getBlockAt()
|
||||||
|
res = tryOpen().read();
|
||||||
|
} catch (AssertionError e) { // assert in HDFS 1.x - DFSInputStream.getBlockAt()
|
||||||
|
res = tryOpen().read();
|
||||||
|
}
|
||||||
|
if (res > 0) pos += 1;
|
||||||
|
return res;
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public int read(byte b[]) throws IOException {
|
||||||
|
return read(b, 0, b.length);
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public int read(byte b[], int off, int len) throws IOException {
|
||||||
|
int n;
|
||||||
|
try {
|
||||||
|
n = in.read(b, off, len);
|
||||||
|
} catch (FileNotFoundException e) {
|
||||||
|
n = tryOpen().read(b, off, len);
|
||||||
|
} catch (NullPointerException e) { // HDFS 1.x - DFSInputStream.getBlockAt()
|
||||||
|
n = tryOpen().read(b, off, len);
|
||||||
|
} catch (AssertionError e) { // assert in HDFS 1.x - DFSInputStream.getBlockAt()
|
||||||
|
n = tryOpen().read(b, off, len);
|
||||||
|
}
|
||||||
|
if (n > 0) pos += n;
|
||||||
|
assert(in.getPos() == pos);
|
||||||
|
return n;
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public int read(long position, byte[] buffer, int offset, int length) throws IOException {
|
||||||
|
int n;
|
||||||
|
try {
|
||||||
|
n = in.read(position, buffer, offset, length);
|
||||||
|
} catch (FileNotFoundException e) {
|
||||||
|
n = tryOpen().read(position, buffer, offset, length);
|
||||||
|
} catch (NullPointerException e) { // HDFS 1.x - DFSInputStream.getBlockAt()
|
||||||
|
n = tryOpen().read(position, buffer, offset, length);
|
||||||
|
} catch (AssertionError e) { // assert in HDFS 1.x - DFSInputStream.getBlockAt()
|
||||||
|
n = tryOpen().read(position, buffer, offset, length);
|
||||||
|
}
|
||||||
|
return n;
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public void readFully(long position, byte[] buffer) throws IOException {
|
||||||
|
readFully(position, buffer, 0, buffer.length);
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public void readFully(long position, byte[] buffer, int offset, int length) throws IOException {
|
||||||
|
try {
|
||||||
|
in.readFully(position, buffer, offset, length);
|
||||||
|
} catch (FileNotFoundException e) {
|
||||||
|
tryOpen().readFully(position, buffer, offset, length);
|
||||||
|
} catch (NullPointerException e) { // HDFS 1.x - DFSInputStream.getBlockAt()
|
||||||
|
tryOpen().readFully(position, buffer, offset, length);
|
||||||
|
} catch (AssertionError e) { // assert in HDFS 1.x - DFSInputStream.getBlockAt()
|
||||||
|
tryOpen().readFully(position, buffer, offset, length);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public long skip(long n) throws IOException {
|
||||||
|
long skipped;
|
||||||
|
|
||||||
|
try {
|
||||||
|
skipped = in.skip(n);
|
||||||
|
} catch (FileNotFoundException e) {
|
||||||
|
skipped = tryOpen().skip(n);
|
||||||
|
} catch (NullPointerException e) { // HDFS 1.x - DFSInputStream.getBlockAt()
|
||||||
|
skipped = tryOpen().skip(n);
|
||||||
|
} catch (AssertionError e) { // assert in HDFS 1.x - DFSInputStream.getBlockAt()
|
||||||
|
skipped = tryOpen().skip(n);
|
||||||
|
}
|
||||||
|
|
||||||
|
if (skipped > 0) pos += skipped;
|
||||||
|
return skipped;
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public int available() throws IOException {
|
||||||
|
try {
|
||||||
|
return in.available();
|
||||||
|
} catch (FileNotFoundException e) {
|
||||||
|
return tryOpen().available();
|
||||||
|
} catch (NullPointerException e) { // HDFS 1.x - DFSInputStream.getBlockAt()
|
||||||
|
return tryOpen().available();
|
||||||
|
} catch (AssertionError e) { // assert in HDFS 1.x - DFSInputStream.getBlockAt()
|
||||||
|
return tryOpen().available();
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public void seek(long pos) throws IOException {
|
||||||
|
try {
|
||||||
|
in.seek(pos);
|
||||||
|
} catch (FileNotFoundException e) {
|
||||||
|
tryOpen().seek(pos);
|
||||||
|
} catch (NullPointerException e) { // HDFS 1.x - DFSInputStream.getBlockAt()
|
||||||
|
tryOpen().seek(pos);
|
||||||
|
} catch (AssertionError e) { // assert in HDFS 1.x - DFSInputStream.getBlockAt()
|
||||||
|
tryOpen().seek(pos);
|
||||||
|
}
|
||||||
|
this.pos = pos;
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public long getPos() throws IOException {
|
||||||
|
return pos;
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public boolean seekToNewSource(long targetPos) throws IOException {
|
||||||
|
boolean res;
|
||||||
|
try {
|
||||||
|
res = in.seekToNewSource(targetPos);
|
||||||
|
} catch (FileNotFoundException e) {
|
||||||
|
res = tryOpen().seekToNewSource(targetPos);
|
||||||
|
} catch (NullPointerException e) { // HDFS 1.x - DFSInputStream.getBlockAt()
|
||||||
|
res = tryOpen().seekToNewSource(targetPos);
|
||||||
|
} catch (AssertionError e) { // assert in HDFS 1.x - DFSInputStream.getBlockAt()
|
||||||
|
res = tryOpen().seekToNewSource(targetPos);
|
||||||
|
}
|
||||||
|
if (res) pos = targetPos;
|
||||||
|
return res;
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public void close() throws IOException {
|
||||||
|
in.close();
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public synchronized void mark(int readlimit) {
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public synchronized void reset() throws IOException {
|
||||||
|
throw new IOException("mark/reset not supported");
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public boolean markSupported() {
|
||||||
|
return false;
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Try to open the file from one of the available locations.
|
||||||
|
*
|
||||||
|
* @return FSDataInputStream stream of the opened file link
|
||||||
|
* @throws IOException on unexpected error, or file not found.
|
||||||
|
*/
|
||||||
|
private FSDataInputStream tryOpen() throws IOException {
|
||||||
|
for (Path path: fileLink.getLocations()) {
|
||||||
|
if (path.equals(currentPath)) continue;
|
||||||
|
try {
|
||||||
|
in = fs.open(path, bufferSize);
|
||||||
|
in.seek(pos);
|
||||||
|
assert(in.getPos() == pos) : "Link unable to seek to the right position=" + pos;
|
||||||
|
if (LOG.isTraceEnabled()) {
|
||||||
|
if (currentPath != null) {
|
||||||
|
LOG.debug("link open path=" + path);
|
||||||
|
} else {
|
||||||
|
LOG.trace("link switch from path=" + currentPath + " to path=" + path);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
currentPath = path;
|
||||||
|
return(in);
|
||||||
|
} catch (FileNotFoundException e) {
|
||||||
|
// Try another file location
|
||||||
|
}
|
||||||
|
}
|
||||||
|
throw new FileNotFoundException("Unable to open link: " + fileLink);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
private Path[] locations = null;
|
||||||
|
|
||||||
|
protected FileLink() {
|
||||||
|
this.locations = null;
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* @param originPath Original location of the file to link
|
||||||
|
* @param alternativePaths Alternative locations to look for the linked file
|
||||||
|
*/
|
||||||
|
public FileLink(Path originPath, Path... alternativePaths) {
|
||||||
|
setLocations(originPath, alternativePaths);
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* @param locations locations to look for the linked file
|
||||||
|
*/
|
||||||
|
public FileLink(final Collection<Path> locations) {
|
||||||
|
this.locations = locations.toArray(new Path[locations.size()]);
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* @return the locations to look for the linked file.
|
||||||
|
*/
|
||||||
|
public Path[] getLocations() {
|
||||||
|
return locations;
|
||||||
|
}
|
||||||
|
|
||||||
|
public String toString() {
|
||||||
|
StringBuilder str = new StringBuilder(getClass().getName());
|
||||||
|
str.append(" locations=[");
|
||||||
|
int i = 0;
|
||||||
|
for (Path location: locations) {
|
||||||
|
if (i++ > 0) str.append(", ");
|
||||||
|
str.append(location.toString());
|
||||||
|
}
|
||||||
|
str.append("]");
|
||||||
|
return str.toString();
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* @return the path of the first available link.
|
||||||
|
*/
|
||||||
|
public Path getAvailablePath(FileSystem fs) throws IOException {
|
||||||
|
for (Path path: locations) {
|
||||||
|
if (fs.exists(path)) {
|
||||||
|
return path;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
throw new FileNotFoundException("Unable to open link: " + this);
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Get the FileStatus of the referenced file.
|
||||||
|
*
|
||||||
|
* @param fs {@link FileSystem} on which to get the file status
|
||||||
|
* @return InputStream for the hfile link.
|
||||||
|
* @throws IOException on unexpected error.
|
||||||
|
*/
|
||||||
|
public FileStatus getFileStatus(FileSystem fs) throws IOException {
|
||||||
|
for (Path path: locations) {
|
||||||
|
try {
|
||||||
|
return fs.getFileStatus(path);
|
||||||
|
} catch (FileNotFoundException e) {
|
||||||
|
// Try another file location
|
||||||
|
}
|
||||||
|
}
|
||||||
|
throw new FileNotFoundException("Unable to open link: " + this);
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Open the FileLink for read.
|
||||||
|
* <p>
|
||||||
|
* It uses a wrapper of FSDataInputStream that is agnostic to the location
|
||||||
|
* of the file, even if the file switches between locations.
|
||||||
|
*
|
||||||
|
* @param fs {@link FileSystem} on which to open the FileLink
|
||||||
|
* @return InputStream for reading the file link.
|
||||||
|
* @throws IOException on unexpected error.
|
||||||
|
*/
|
||||||
|
public FSDataInputStream open(final FileSystem fs) throws IOException {
|
||||||
|
return new FSDataInputStream(new FileLinkInputStream(fs, this));
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Open the FileLink for read.
|
||||||
|
* <p>
|
||||||
|
* It uses a wrapper of FSDataInputStream that is agnostic to the location
|
||||||
|
* of the file, even if the file switches between locations.
|
||||||
|
*
|
||||||
|
* @param fs {@link FileSystem} on which to open the FileLink
|
||||||
|
* @param bufferSize the size of the buffer to be used.
|
||||||
|
* @return InputStream for reading the file link.
|
||||||
|
* @throws IOException on unexpected error.
|
||||||
|
*/
|
||||||
|
public FSDataInputStream open(final FileSystem fs, int bufferSize) throws IOException {
|
||||||
|
return new FSDataInputStream(new FileLinkInputStream(fs, this));
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* NOTE: This method must be used only in the constructor!
|
||||||
|
* It creates a List with the specified locations for the link.
|
||||||
|
*/
|
||||||
|
protected void setLocations(Path originPath, Path... alternativePaths) {
|
||||||
|
assert this.locations == null : "Link locations already set";
|
||||||
|
this.locations = new Path[1 + alternativePaths.length];
|
||||||
|
this.locations[0] = originPath;
|
||||||
|
for (int i = 0; i < alternativePaths.length; i++) {
|
||||||
|
this.locations[i + 1] = alternativePaths[i];
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Get the directory to store the link back references
|
||||||
|
*
|
||||||
|
* <p>To simplify the reference count process, during the FileLink creation
|
||||||
|
* a back-reference is added to the back-reference directory of the specified file.
|
||||||
|
*
|
||||||
|
* @param storeDir Root directory for the link reference folder
|
||||||
|
* @param fileName File Name with links
|
||||||
|
* @return Path for the link back references.
|
||||||
|
*/
|
||||||
|
public static Path getBackReferencesDir(final Path storeDir, final String fileName) {
|
||||||
|
return new Path(storeDir, BACK_REFERENCES_DIRECTORY_PREFIX + fileName);
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Get the referenced file name from the reference link directory path.
|
||||||
|
*
|
||||||
|
* @param dirPath Link references directory path
|
||||||
|
* @return Name of the file referenced
|
||||||
|
*/
|
||||||
|
public static String getBackReferenceFileName(final Path dirPath) {
|
||||||
|
return dirPath.getName().substring(BACK_REFERENCES_DIRECTORY_PREFIX.length());
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Checks if the specified directory path is a back reference links folder.
|
||||||
|
*
|
||||||
|
* @param dirPath Directory path to verify
|
||||||
|
* @return True if the specified directory is a link references folder
|
||||||
|
*/
|
||||||
|
public static boolean isBackReferencesDir(final Path dirPath) {
|
||||||
|
if (dirPath == null) return false;
|
||||||
|
return dirPath.getName().startsWith(BACK_REFERENCES_DIRECTORY_PREFIX);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
|
@ -0,0 +1,366 @@
|
||||||
|
/**
|
||||||
|
* Licensed to the Apache Software Foundation (ASF) under one
|
||||||
|
* or more contributor license agreements. See the NOTICE file
|
||||||
|
* distributed with this work for additional information
|
||||||
|
* regarding copyright ownership. The ASF licenses this file
|
||||||
|
* to you under the Apache License, Version 2.0 (the
|
||||||
|
* "License"); you may not use this file except in compliance
|
||||||
|
* with the License. You may obtain a copy of the License at
|
||||||
|
*
|
||||||
|
* http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
*
|
||||||
|
* Unless required by applicable law or agreed to in writing, software
|
||||||
|
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||||
|
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||||
|
* See the License for the specific language governing permissions and
|
||||||
|
* limitations under the License.
|
||||||
|
*/
|
||||||
|
|
||||||
|
package org.apache.hadoop.hbase.io;
|
||||||
|
|
||||||
|
import java.io.IOException;
|
||||||
|
import java.util.regex.Matcher;
|
||||||
|
import java.util.regex.Pattern;
|
||||||
|
import java.security.MessageDigest;
|
||||||
|
import java.security.NoSuchAlgorithmException;
|
||||||
|
|
||||||
|
import org.apache.commons.codec.binary.Hex;
|
||||||
|
import org.apache.commons.logging.Log;
|
||||||
|
import org.apache.commons.logging.LogFactory;
|
||||||
|
import org.apache.hadoop.classification.InterfaceAudience;
|
||||||
|
import org.apache.hadoop.conf.Configuration;
|
||||||
|
import org.apache.hadoop.fs.FileSystem;
|
||||||
|
import org.apache.hadoop.fs.FileStatus;
|
||||||
|
import org.apache.hadoop.fs.Path;
|
||||||
|
import org.apache.hadoop.hbase.HRegionInfo;
|
||||||
|
import org.apache.hadoop.hbase.regionserver.HRegion;
|
||||||
|
import org.apache.hadoop.hbase.util.Bytes;
|
||||||
|
import org.apache.hadoop.hbase.util.HFileArchiveUtil;
|
||||||
|
import org.apache.hadoop.hbase.util.FSUtils;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* HFileLink describes a link to an hfile.
|
||||||
|
*
|
||||||
|
* An hfile can be served from a region or from the hfile archive directory as
|
||||||
|
* specified by {@value HConstants.HFILE_ARCHIVE_DIRECTORY} conf property.
|
||||||
|
* HFileLink allows to access the referenced hfile regardless of the location where it is.
|
||||||
|
*
|
||||||
|
* <p>Searches for hfiles in the following order and locations:
|
||||||
|
* <ul>
|
||||||
|
* <li>/hbase/table/region/cf/hfile</li>
|
||||||
|
* <li>/hbase/archive/table/region/cf/hfile</li>
|
||||||
|
* </ul>
|
||||||
|
*
|
||||||
|
* The link checks first in the original path if it is not present
|
||||||
|
* it fallbacks to the archived path.
|
||||||
|
*/
|
||||||
|
@InterfaceAudience.Private
|
||||||
|
public class HFileLink extends FileLink {
|
||||||
|
private static final Log LOG = LogFactory.getLog(HFileLink.class);
|
||||||
|
|
||||||
|
/** Define the HFile Link name pattern in the form of: hfile-region-table */
|
||||||
|
public static final Pattern LINK_NAME_PARSER =
|
||||||
|
Pattern.compile("^([0-9a-f\\.]+)-([0-9a-f]+)-([a-zA-Z_0-9]+[a-zA-Z0-9_\\-\\.]*)$");
|
||||||
|
|
||||||
|
private final Path archivePath;
|
||||||
|
private final Path originPath;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* @param conf {@link Configuration} from which to extract specific archive locations
|
||||||
|
* @param path The path of the HFile Link.
|
||||||
|
* @throws IOException on unexpected error.
|
||||||
|
*/
|
||||||
|
public HFileLink(Configuration conf, Path path) throws IOException {
|
||||||
|
this(FSUtils.getRootDir(conf), HFileArchiveUtil.getArchivePath(conf), path);
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* @param rootdir Path to the root directory where hbase files are stored
|
||||||
|
* @param archiveDir Path to the hbase archive directory
|
||||||
|
* @param path The path of the HFile Link.
|
||||||
|
*/
|
||||||
|
public HFileLink(final Path rootDir, final Path archiveDir, final Path path) {
|
||||||
|
Path hfilePath = getRelativeTablePath(path);
|
||||||
|
this.originPath = new Path(rootDir, hfilePath);
|
||||||
|
this.archivePath = new Path(archiveDir, hfilePath);
|
||||||
|
setLocations(originPath, archivePath);
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* @param originPath Path to the hfile in the table directory
|
||||||
|
* @param archiveDir Path to the hfile in the archive directory
|
||||||
|
*/
|
||||||
|
public HFileLink(final Path originPath, final Path archivePath) {
|
||||||
|
this.originPath = originPath;
|
||||||
|
this.archivePath = archivePath;
|
||||||
|
setLocations(originPath, archivePath);
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* @return the origin path of the hfile.
|
||||||
|
*/
|
||||||
|
public Path getOriginPath() {
|
||||||
|
return this.originPath;
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* @return the path of the archived hfile.
|
||||||
|
*/
|
||||||
|
public Path getArchivePath() {
|
||||||
|
return this.archivePath;
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* @param p Path to check.
|
||||||
|
* @return True if the path is a HFileLink.
|
||||||
|
*/
|
||||||
|
public static boolean isHFileLink(final Path path) {
|
||||||
|
return isHFileLink(path.getName());
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
|
/**
|
||||||
|
* @param fileName File name to check.
|
||||||
|
* @return True if the path is a HFileLink.
|
||||||
|
*/
|
||||||
|
public static boolean isHFileLink(String fileName) {
|
||||||
|
Matcher m = LINK_NAME_PARSER.matcher(fileName);
|
||||||
|
if (!m.matches()) return false;
|
||||||
|
|
||||||
|
return m.groupCount() > 2 && m.group(2) != null && m.group(3) != null;
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* The returned path can be the "original" file path like: /hbase/table/region/cf/hfile
|
||||||
|
* or a path to the archived file like: /hbase/archive/table/region/cf/hfile
|
||||||
|
*
|
||||||
|
* @param fs {@link FileSystem} on which to check the HFileLink
|
||||||
|
* @param path HFileLink path
|
||||||
|
* @return Referenced path (original path or archived path)
|
||||||
|
* @throws IOException on unexpected error.
|
||||||
|
*/
|
||||||
|
public static Path getReferencedPath(FileSystem fs, final Path path) throws IOException {
|
||||||
|
return getReferencedPath(fs.getConf(), fs, path);
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* The returned path can be the "original" file path like: /hbase/table/region/cf/hfile
|
||||||
|
* or a path to the archived file like: /hbase/archive/table/region/cf/hfile
|
||||||
|
*
|
||||||
|
* @param fs {@link FileSystem} on which to check the HFileLink
|
||||||
|
* @param conf {@link Configuration} from which to extract specific archive locations
|
||||||
|
* @param path HFileLink path
|
||||||
|
* @return Referenced path (original path or archived path)
|
||||||
|
* @throws IOException on unexpected error.
|
||||||
|
*/
|
||||||
|
public static Path getReferencedPath(final Configuration conf, final FileSystem fs,
|
||||||
|
final Path path) throws IOException {
|
||||||
|
return getReferencedPath(fs, FSUtils.getRootDir(conf),
|
||||||
|
HFileArchiveUtil.getArchivePath(conf), path);
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* The returned path can be the "original" file path like: /hbase/table/region/cf/hfile
|
||||||
|
* or a path to the archived file like: /hbase/archive/table/region/cf/hfile
|
||||||
|
*
|
||||||
|
* @param fs {@link FileSystem} on which to check the HFileLink
|
||||||
|
* @param rootdir root hbase directory
|
||||||
|
* @param archiveDir Path to the hbase archive directory
|
||||||
|
* @param path HFileLink path
|
||||||
|
* @return Referenced path (original path or archived path)
|
||||||
|
* @throws IOException on unexpected error.
|
||||||
|
*/
|
||||||
|
public static Path getReferencedPath(final FileSystem fs, final Path rootDir,
|
||||||
|
final Path archiveDir, final Path path) throws IOException {
|
||||||
|
Path hfilePath = getRelativeTablePath(path);
|
||||||
|
|
||||||
|
Path originPath = new Path(rootDir, hfilePath);
|
||||||
|
if (fs.exists(originPath)) {
|
||||||
|
return originPath;
|
||||||
|
}
|
||||||
|
|
||||||
|
return new Path(archiveDir, hfilePath);
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Convert a HFileLink path to a table relative path.
|
||||||
|
* e.g. the link: /hbase/test/0123/cf/abcd-4567-testtb
|
||||||
|
* becomes: /hbase/testtb/4567/cf/abcd
|
||||||
|
*
|
||||||
|
* @param path HFileLink path
|
||||||
|
* @return Relative table path
|
||||||
|
* @throws IOException on unexpected error.
|
||||||
|
*/
|
||||||
|
private static Path getRelativeTablePath(final Path path) {
|
||||||
|
// hfile-region-table
|
||||||
|
Matcher m = LINK_NAME_PARSER.matcher(path.getName());
|
||||||
|
if (!m.matches()) {
|
||||||
|
throw new IllegalArgumentException(path.getName() + " is not a valid HFileLink name!");
|
||||||
|
}
|
||||||
|
|
||||||
|
// Convert the HFileLink name into a real table/region/cf/hfile path.
|
||||||
|
String hfileName = m.group(1);
|
||||||
|
String regionName = m.group(2);
|
||||||
|
String tableName = m.group(3);
|
||||||
|
String familyName = path.getParent().getName();
|
||||||
|
return new Path(new Path(tableName, regionName), new Path(familyName, hfileName));
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Get the HFile name of the referenced link
|
||||||
|
*
|
||||||
|
* @param fileName HFileLink file name
|
||||||
|
* @return the name of the referenced HFile
|
||||||
|
*/
|
||||||
|
public static String getReferencedHFileName(final String fileName) {
|
||||||
|
Matcher m = LINK_NAME_PARSER.matcher(fileName);
|
||||||
|
if (!m.matches()) {
|
||||||
|
throw new IllegalArgumentException(fileName + " is not a valid HFileLink name!");
|
||||||
|
}
|
||||||
|
return(m.group(1));
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Get the Region name of the referenced link
|
||||||
|
*
|
||||||
|
* @param fileName HFileLink file name
|
||||||
|
* @return the name of the referenced Region
|
||||||
|
*/
|
||||||
|
public static String getReferencedRegionName(final String fileName) {
|
||||||
|
Matcher m = LINK_NAME_PARSER.matcher(fileName);
|
||||||
|
if (!m.matches()) {
|
||||||
|
throw new IllegalArgumentException(fileName + " is not a valid HFileLink name!");
|
||||||
|
}
|
||||||
|
return(m.group(2));
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Get the Table name of the referenced link
|
||||||
|
*
|
||||||
|
* @param fileName HFileLink file name
|
||||||
|
* @return the name of the referenced Table
|
||||||
|
*/
|
||||||
|
public static String getReferencedTableName(final String fileName) {
|
||||||
|
Matcher m = LINK_NAME_PARSER.matcher(fileName);
|
||||||
|
if (!m.matches()) {
|
||||||
|
throw new IllegalArgumentException(fileName + " is not a valid HFileLink name!");
|
||||||
|
}
|
||||||
|
return(m.group(3));
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Create a new HFileLink name
|
||||||
|
*
|
||||||
|
* @param hfileRegionInfo - Linked HFile Region Info
|
||||||
|
* @param hfileName - Linked HFile name
|
||||||
|
* @return file name of the HFile Link
|
||||||
|
*/
|
||||||
|
public static String createHFileLinkName(final HRegionInfo hfileRegionInfo,
|
||||||
|
final String hfileName) {
|
||||||
|
return createHFileLinkName(hfileRegionInfo.getTableNameAsString(),
|
||||||
|
hfileRegionInfo.getEncodedName(), hfileName);
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Create a new HFileLink name
|
||||||
|
*
|
||||||
|
* @param tableName - Linked HFile table name
|
||||||
|
* @param regionName - Linked HFile region name
|
||||||
|
* @param hfileName - Linked HFile name
|
||||||
|
* @return file name of the HFile Link
|
||||||
|
*/
|
||||||
|
public static String createHFileLinkName(final String tableName,
|
||||||
|
final String regionName, final String hfileName) {
|
||||||
|
return String.format("%s-%s-%s", hfileName, regionName, tableName);
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Create a new HFileLink
|
||||||
|
*
|
||||||
|
* <p>It also add a back-reference to the hfile back-reference directory
|
||||||
|
* to simplify the reference-count and the cleaning process.
|
||||||
|
*
|
||||||
|
* @param conf {@link Configuration} to read for the archive directory name
|
||||||
|
* @param fs {@link FileSystem} on which to write the HFileLink
|
||||||
|
* @param dstFamilyPath - Destination path (table/region/cf/)
|
||||||
|
* @param hfileRegionInfo - Linked HFile Region Info
|
||||||
|
* @param hfileName - Linked HFile name
|
||||||
|
* @return true if the file is created, otherwise the file exists.
|
||||||
|
* @throws IOException on file or parent directory creation failure
|
||||||
|
*/
|
||||||
|
public static boolean create(final Configuration conf, final FileSystem fs,
|
||||||
|
final Path dstFamilyPath, final HRegionInfo hfileRegionInfo,
|
||||||
|
final String hfileName) throws IOException {
|
||||||
|
String familyName = dstFamilyPath.getName();
|
||||||
|
String regionName = dstFamilyPath.getParent().getName();
|
||||||
|
String tableName = dstFamilyPath.getParent().getParent().getName();
|
||||||
|
|
||||||
|
String name = createHFileLinkName(hfileRegionInfo, hfileName);
|
||||||
|
String refName = createBackReferenceName(tableName, regionName);
|
||||||
|
|
||||||
|
// Make sure the destination directory exists
|
||||||
|
fs.mkdirs(dstFamilyPath);
|
||||||
|
|
||||||
|
// Make sure the FileLink reference directory exists
|
||||||
|
Path archiveStoreDir = HFileArchiveUtil.getStoreArchivePath(conf,
|
||||||
|
hfileRegionInfo.getTableNameAsString(), hfileRegionInfo.getEncodedName(), familyName);
|
||||||
|
Path backRefssDir = getBackReferencesDir(archiveStoreDir, hfileName);
|
||||||
|
fs.mkdirs(backRefssDir);
|
||||||
|
|
||||||
|
// Create the reference for the link
|
||||||
|
Path backRefPath = new Path(backRefssDir, refName);
|
||||||
|
fs.createNewFile(backRefPath);
|
||||||
|
try {
|
||||||
|
// Create the link
|
||||||
|
return fs.createNewFile(new Path(dstFamilyPath, name));
|
||||||
|
} catch (IOException e) {
|
||||||
|
LOG.error("couldn't create the link=" + name + " for " + dstFamilyPath, e);
|
||||||
|
// Revert the reference if the link creation failed
|
||||||
|
fs.delete(backRefPath, false);
|
||||||
|
throw e;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Create the back reference name
|
||||||
|
*/
|
||||||
|
private static String createBackReferenceName(final String tableName, final String regionName) {
|
||||||
|
return regionName + "." + tableName;
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Get the full path of the HFile referenced by the back reference
|
||||||
|
*
|
||||||
|
* @param rootdir root hbase directory
|
||||||
|
* @param linkRefPath Link Back Reference path
|
||||||
|
* @return full path of the referenced hfile
|
||||||
|
* @throws IOException on unexpected error.
|
||||||
|
*/
|
||||||
|
public static Path getHFileFromBackReference(final Path rootDir, final Path linkRefPath) {
|
||||||
|
int separatorIndex = linkRefPath.getName().indexOf('.');
|
||||||
|
String linkRegionName = linkRefPath.getName().substring(0, separatorIndex);
|
||||||
|
String linkTableName = linkRefPath.getName().substring(separatorIndex + 1);
|
||||||
|
String hfileName = getBackReferenceFileName(linkRefPath.getParent());
|
||||||
|
Path familyPath = linkRefPath.getParent().getParent();
|
||||||
|
Path regionPath = familyPath.getParent();
|
||||||
|
Path tablePath = regionPath.getParent();
|
||||||
|
|
||||||
|
String linkName = createHFileLinkName(tablePath.getName(), regionPath.getName(), hfileName);
|
||||||
|
Path linkTableDir = FSUtils.getTablePath(rootDir, linkTableName);
|
||||||
|
Path regionDir = HRegion.getRegionDir(linkTableDir, linkRegionName);
|
||||||
|
return new Path(new Path(regionDir, familyPath.getName()), linkName);
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Get the full path of the HFile referenced by the back reference
|
||||||
|
*
|
||||||
|
* @param conf {@link Configuration} to read for the archive directory name
|
||||||
|
* @param linkRefPath Link Back Reference path
|
||||||
|
* @return full path of the referenced hfile
|
||||||
|
* @throws IOException on unexpected error.
|
||||||
|
*/
|
||||||
|
public static Path getHFileFromBackReference(final Configuration conf, final Path linkRefPath)
|
||||||
|
throws IOException {
|
||||||
|
return getHFileFromBackReference(FSUtils.getRootDir(conf), linkRefPath);
|
||||||
|
}
|
||||||
|
}
|
|
@ -590,6 +590,39 @@ public class HFile {
|
||||||
preferredEncodingInCache, hfs);
|
preferredEncodingInCache, hfs);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* @param fs A file system
|
||||||
|
* @param path Path to HFile
|
||||||
|
* @param fsdis an open checksummed stream of path's file
|
||||||
|
* @param fsdisNoFsChecksum an open unchecksummed stream of path's file
|
||||||
|
* @param size max size of the trailer.
|
||||||
|
* @param cacheConf Cache configuration for hfile's contents
|
||||||
|
* @param preferredEncodingInCache Preferred in-cache data encoding algorithm.
|
||||||
|
* @param closeIStream boolean for closing file after the getting the reader version.
|
||||||
|
* @return A version specific Hfile Reader
|
||||||
|
* @throws IOException If file is invalid, will throw CorruptHFileException flavored IOException
|
||||||
|
*/
|
||||||
|
public static Reader createReaderWithEncoding(
|
||||||
|
FileSystem fs, Path path, FSDataInputStream fsdis,
|
||||||
|
FSDataInputStream fsdisNoFsChecksum, long size, CacheConfig cacheConf,
|
||||||
|
DataBlockEncoding preferredEncodingInCache, boolean closeIStream)
|
||||||
|
throws IOException {
|
||||||
|
HFileSystem hfs = null;
|
||||||
|
|
||||||
|
// If the fs is not an instance of HFileSystem, then create an
|
||||||
|
// instance of HFileSystem that wraps over the specified fs.
|
||||||
|
// In this case, we will not be able to avoid checksumming inside
|
||||||
|
// the filesystem.
|
||||||
|
if (!(fs instanceof HFileSystem)) {
|
||||||
|
hfs = new HFileSystem(fs);
|
||||||
|
} else {
|
||||||
|
hfs = (HFileSystem)fs;
|
||||||
|
}
|
||||||
|
return pickReaderVersion(path, fsdis, fsdisNoFsChecksum, size,
|
||||||
|
closeIStream, cacheConf,
|
||||||
|
preferredEncodingInCache, hfs);
|
||||||
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
*
|
*
|
||||||
* @param fs filesystem
|
* @param fs filesystem
|
||||||
|
|
|
@ -82,7 +82,10 @@ public abstract class CleanerChore<T extends FileCleanerDelegate> extends Chore
|
||||||
if (logCleaners != null) {
|
if (logCleaners != null) {
|
||||||
for (String className : logCleaners) {
|
for (String className : logCleaners) {
|
||||||
T logCleaner = newFileCleaner(className, conf);
|
T logCleaner = newFileCleaner(className, conf);
|
||||||
if (logCleaner != null) this.cleanersChain.add(logCleaner);
|
if (logCleaner != null) {
|
||||||
|
LOG.debug("initialize cleaner=" + className);
|
||||||
|
this.cleanersChain.add(logCleaner);
|
||||||
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -196,7 +199,7 @@ public abstract class CleanerChore<T extends FileCleanerDelegate> extends Chore
|
||||||
*/
|
*/
|
||||||
private void checkAndDelete(Path filePath) throws IOException, IllegalArgumentException {
|
private void checkAndDelete(Path filePath) throws IOException, IllegalArgumentException {
|
||||||
if (!validate(filePath)) {
|
if (!validate(filePath)) {
|
||||||
LOG.warn("Found a wrongly formatted file: " + filePath.getName() + "deleting it.");
|
LOG.warn("Found a wrongly formatted file: " + filePath.getName() + " deleting it.");
|
||||||
if (!this.fs.delete(filePath, true)) {
|
if (!this.fs.delete(filePath, true)) {
|
||||||
LOG.warn("Attempted to delete:" + filePath
|
LOG.warn("Attempted to delete:" + filePath
|
||||||
+ ", but couldn't. Run cleaner chain and attempt to delete on next pass.");
|
+ ", but couldn't. Run cleaner chain and attempt to delete on next pass.");
|
||||||
|
|
|
@ -22,6 +22,7 @@ import org.apache.hadoop.conf.Configuration;
|
||||||
import org.apache.hadoop.fs.FileSystem;
|
import org.apache.hadoop.fs.FileSystem;
|
||||||
import org.apache.hadoop.fs.Path;
|
import org.apache.hadoop.fs.Path;
|
||||||
import org.apache.hadoop.hbase.Stoppable;
|
import org.apache.hadoop.hbase.Stoppable;
|
||||||
|
import org.apache.hadoop.hbase.io.HFileLink;
|
||||||
import org.apache.hadoop.hbase.regionserver.StoreFile;
|
import org.apache.hadoop.hbase.regionserver.StoreFile;
|
||||||
/**
|
/**
|
||||||
* This Chore, every time it runs, will clear the HFiles in the hfile archive
|
* This Chore, every time it runs, will clear the HFiles in the hfile archive
|
||||||
|
@ -46,6 +47,9 @@ public class HFileCleaner extends CleanerChore<BaseHFileCleanerDelegate> {
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
protected boolean validate(Path file) {
|
protected boolean validate(Path file) {
|
||||||
|
if (HFileLink.isBackReferencesDir(file) || HFileLink.isBackReferencesDir(file.getParent())) {
|
||||||
|
return true;
|
||||||
|
}
|
||||||
return StoreFile.validateStoreFileName(file.getName());
|
return StoreFile.validateStoreFileName(file.getName());
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -0,0 +1,91 @@
|
||||||
|
/**
|
||||||
|
* Licensed to the Apache Software Foundation (ASF) under one
|
||||||
|
* or more contributor license agreements. See the NOTICE file
|
||||||
|
* distributed with this work for additional information
|
||||||
|
* regarding copyright ownership. The ASF licenses this file
|
||||||
|
* to you under the Apache License, Version 2.0 (the
|
||||||
|
* "License"); you may not use this file except in compliance
|
||||||
|
* with the License. You may obtain a copy of the License at
|
||||||
|
*
|
||||||
|
* http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
*
|
||||||
|
* Unless required by applicable law or agreed to in writing, software
|
||||||
|
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||||
|
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||||
|
* See the License for the specific language governing permissions and
|
||||||
|
* limitations under the License.
|
||||||
|
*/
|
||||||
|
package org.apache.hadoop.hbase.master.cleaner;
|
||||||
|
|
||||||
|
import java.io.IOException;
|
||||||
|
import java.util.LinkedList;
|
||||||
|
import java.util.List;
|
||||||
|
|
||||||
|
import org.apache.commons.logging.Log;
|
||||||
|
import org.apache.commons.logging.LogFactory;
|
||||||
|
import org.apache.hadoop.classification.InterfaceAudience;
|
||||||
|
import org.apache.hadoop.conf.Configuration;
|
||||||
|
import org.apache.hadoop.fs.FileSystem;
|
||||||
|
import org.apache.hadoop.fs.Path;
|
||||||
|
|
||||||
|
import org.apache.hadoop.hbase.io.HFileLink;
|
||||||
|
import org.apache.hadoop.hbase.util.FSUtils;
|
||||||
|
import org.apache.hadoop.hbase.util.HFileArchiveUtil;
|
||||||
|
import org.apache.hadoop.hbase.master.cleaner.BaseHFileCleanerDelegate;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* HFileLink cleaner that determines if a hfile should be deleted.
|
||||||
|
* HFiles can be deleted only if there're no links to them.
|
||||||
|
*
|
||||||
|
* When a HFileLink is created a back reference file is created in:
|
||||||
|
* /hbase/archive/table/region/cf/.links-hfile/ref-region.ref-table
|
||||||
|
* To check if the hfile can be deleted the back references folder must be empty.
|
||||||
|
*/
|
||||||
|
@InterfaceAudience.Private
|
||||||
|
public class HFileLinkCleaner extends BaseHFileCleanerDelegate {
|
||||||
|
private static final Log LOG = LogFactory.getLog(HFileLinkCleaner.class);
|
||||||
|
|
||||||
|
private FileSystem fs = null;
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public synchronized boolean isFileDeletable(Path filePath) {
|
||||||
|
if (this.fs == null) return false;
|
||||||
|
|
||||||
|
// HFile Link is always deletable
|
||||||
|
if (HFileLink.isHFileLink(filePath)) return true;
|
||||||
|
|
||||||
|
// If the file is inside a link references directory, means that is a back ref link.
|
||||||
|
// The back ref can be deleted only if the referenced file doesn't exists.
|
||||||
|
Path parentDir = filePath.getParent();
|
||||||
|
if (HFileLink.isBackReferencesDir(parentDir)) {
|
||||||
|
try {
|
||||||
|
Path hfilePath = HFileLink.getHFileFromBackReference(getConf(), filePath);
|
||||||
|
return !fs.exists(hfilePath);
|
||||||
|
} catch (IOException e) {
|
||||||
|
LOG.error("Couldn't verify if the referenced file still exists, keep it just in case");
|
||||||
|
return false;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// HFile is deletable only if has no links
|
||||||
|
try {
|
||||||
|
Path backRefDir = HFileLink.getBackReferencesDir(parentDir, filePath.getName());
|
||||||
|
return FSUtils.listStatus(fs, backRefDir) == null;
|
||||||
|
} catch (IOException e) {
|
||||||
|
LOG.error("Couldn't get the references, not deleting file, just in case");
|
||||||
|
return false;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public void setConf(Configuration conf) {
|
||||||
|
super.setConf(conf);
|
||||||
|
|
||||||
|
// setup filesystem
|
||||||
|
try {
|
||||||
|
this.fs = FileSystem.get(this.getConf());
|
||||||
|
} catch (IOException e) {
|
||||||
|
LOG.error("Couldn't instantiate the file system, not deleting file, just in case");
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
|
@ -53,6 +53,7 @@ import org.apache.hadoop.hbase.RemoteExceptionHandler;
|
||||||
import org.apache.hadoop.hbase.backup.HFileArchiver;
|
import org.apache.hadoop.hbase.backup.HFileArchiver;
|
||||||
import org.apache.hadoop.hbase.client.Scan;
|
import org.apache.hadoop.hbase.client.Scan;
|
||||||
import org.apache.hadoop.hbase.fs.HFileSystem;
|
import org.apache.hadoop.hbase.fs.HFileSystem;
|
||||||
|
import org.apache.hadoop.hbase.io.HFileLink;
|
||||||
import org.apache.hadoop.hbase.io.HeapSize;
|
import org.apache.hadoop.hbase.io.HeapSize;
|
||||||
import org.apache.hadoop.hbase.io.hfile.CacheConfig;
|
import org.apache.hadoop.hbase.io.hfile.CacheConfig;
|
||||||
import org.apache.hadoop.hbase.io.hfile.Compression;
|
import org.apache.hadoop.hbase.io.hfile.Compression;
|
||||||
|
@ -328,10 +329,28 @@ public class HStore extends SchemaConfigured implements Store {
|
||||||
*/
|
*/
|
||||||
public static Path getStoreHomedir(final Path tabledir,
|
public static Path getStoreHomedir(final Path tabledir,
|
||||||
final String encodedName, final byte [] family) {
|
final String encodedName, final byte [] family) {
|
||||||
return new Path(tabledir, new Path(encodedName,
|
return getStoreHomedir(tabledir, encodedName, Bytes.toString(family));
|
||||||
new Path(Bytes.toString(family))));
|
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* @param tabledir
|
||||||
|
* @param encodedName Encoded region name.
|
||||||
|
* @param family
|
||||||
|
* @return Path to family/Store home directory.
|
||||||
|
*/
|
||||||
|
public static Path getStoreHomedir(final Path tabledir,
|
||||||
|
final String encodedName, final String family) {
|
||||||
|
return new Path(tabledir, new Path(encodedName, new Path(family)));
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* @param parentRegionDirectory directory for the parent region
|
||||||
|
* @param family family name of this store
|
||||||
|
* @return Path to the family/Store home directory
|
||||||
|
*/
|
||||||
|
public static Path getStoreHomedir(final Path parentRegionDirectory, final byte[] family) {
|
||||||
|
return new Path(parentRegionDirectory, new Path(Bytes.toString(family)));
|
||||||
|
}
|
||||||
/**
|
/**
|
||||||
* Return the directory in which this store stores its
|
* Return the directory in which this store stores its
|
||||||
* StoreFiles
|
* StoreFiles
|
||||||
|
@ -383,9 +402,10 @@ public class HStore extends SchemaConfigured implements Store {
|
||||||
continue;
|
continue;
|
||||||
}
|
}
|
||||||
final Path p = files[i].getPath();
|
final Path p = files[i].getPath();
|
||||||
// Check for empty file. Should never be the case but can happen
|
// Check for empty hfile. Should never be the case but can happen
|
||||||
// after data loss in hdfs for whatever reason (upgrade, etc.): HBASE-646
|
// after data loss in hdfs for whatever reason (upgrade, etc.): HBASE-646
|
||||||
if (this.fs.getFileStatus(p).getLen() <= 0) {
|
// NOTE: that the HFileLink is just a name, so it's an empty file.
|
||||||
|
if (!HFileLink.isHFileLink(p) && this.fs.getFileStatus(p).getLen() <= 0) {
|
||||||
LOG.warn("Skipping " + p + " because its empty. HBASE-646 DATA LOSS?");
|
LOG.warn("Skipping " + p + " because its empty. HBASE-646 DATA LOSS?");
|
||||||
continue;
|
continue;
|
||||||
}
|
}
|
||||||
|
|
|
@ -37,6 +37,7 @@ import org.apache.commons.logging.Log;
|
||||||
import org.apache.commons.logging.LogFactory;
|
import org.apache.commons.logging.LogFactory;
|
||||||
import org.apache.hadoop.classification.InterfaceAudience;
|
import org.apache.hadoop.classification.InterfaceAudience;
|
||||||
import org.apache.hadoop.conf.Configuration;
|
import org.apache.hadoop.conf.Configuration;
|
||||||
|
import org.apache.hadoop.fs.FSDataInputStream;
|
||||||
import org.apache.hadoop.fs.FileStatus;
|
import org.apache.hadoop.fs.FileStatus;
|
||||||
import org.apache.hadoop.fs.FileSystem;
|
import org.apache.hadoop.fs.FileSystem;
|
||||||
import org.apache.hadoop.fs.Path;
|
import org.apache.hadoop.fs.Path;
|
||||||
|
@ -45,6 +46,8 @@ import org.apache.hadoop.hbase.HDFSBlocksDistribution;
|
||||||
import org.apache.hadoop.hbase.KeyValue;
|
import org.apache.hadoop.hbase.KeyValue;
|
||||||
import org.apache.hadoop.hbase.KeyValue.KVComparator;
|
import org.apache.hadoop.hbase.KeyValue.KVComparator;
|
||||||
import org.apache.hadoop.hbase.client.Scan;
|
import org.apache.hadoop.hbase.client.Scan;
|
||||||
|
import org.apache.hadoop.hbase.io.HFileLink;
|
||||||
|
import org.apache.hadoop.hbase.fs.HFileSystem;
|
||||||
import org.apache.hadoop.hbase.io.HalfStoreFileReader;
|
import org.apache.hadoop.hbase.io.HalfStoreFileReader;
|
||||||
import org.apache.hadoop.hbase.io.Reference;
|
import org.apache.hadoop.hbase.io.Reference;
|
||||||
import org.apache.hadoop.hbase.io.encoding.DataBlockEncoding;
|
import org.apache.hadoop.hbase.io.encoding.DataBlockEncoding;
|
||||||
|
@ -151,6 +154,9 @@ public class StoreFile extends SchemaConfigured {
|
||||||
// If this StoreFile references another, this is the other files path.
|
// If this StoreFile references another, this is the other files path.
|
||||||
private Path referencePath;
|
private Path referencePath;
|
||||||
|
|
||||||
|
// If this storefile is a link to another, this is the link instance.
|
||||||
|
private HFileLink link;
|
||||||
|
|
||||||
// Block cache configuration and reference.
|
// Block cache configuration and reference.
|
||||||
private final CacheConfig cacheConf;
|
private final CacheConfig cacheConf;
|
||||||
|
|
||||||
|
@ -245,9 +251,14 @@ public class StoreFile extends SchemaConfigured {
|
||||||
this.dataBlockEncoder =
|
this.dataBlockEncoder =
|
||||||
dataBlockEncoder == null ? NoOpDataBlockEncoder.INSTANCE
|
dataBlockEncoder == null ? NoOpDataBlockEncoder.INSTANCE
|
||||||
: dataBlockEncoder;
|
: dataBlockEncoder;
|
||||||
if (isReference(p)) {
|
|
||||||
|
if (HFileLink.isHFileLink(p)) {
|
||||||
|
this.link = new HFileLink(conf, p);
|
||||||
|
LOG.debug("Store file " + p + " is a link");
|
||||||
|
} else if (isReference(p)) {
|
||||||
this.reference = Reference.read(fs, p);
|
this.reference = Reference.read(fs, p);
|
||||||
this.referencePath = getReferredToFile(this.path);
|
this.referencePath = getReferredToFile(this.path);
|
||||||
|
LOG.debug("Store file " + p + " is a reference");
|
||||||
}
|
}
|
||||||
|
|
||||||
if (BloomFilterFactory.isGeneralBloomEnabled(conf)) {
|
if (BloomFilterFactory.isGeneralBloomEnabled(conf)) {
|
||||||
|
@ -291,6 +302,13 @@ public class StoreFile extends SchemaConfigured {
|
||||||
return this.reference != null;
|
return this.reference != null;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* @return <tt>true</tt> if this StoreFile is an HFileLink
|
||||||
|
*/
|
||||||
|
boolean isLink() {
|
||||||
|
return this.link != null;
|
||||||
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* @param p Path to check.
|
* @param p Path to check.
|
||||||
* @return True if the path has format of a HStoreFile reference.
|
* @return True if the path has format of a HStoreFile reference.
|
||||||
|
@ -476,6 +494,7 @@ public class StoreFile extends SchemaConfigured {
|
||||||
Path referencePath = getReferredToFile(p);
|
Path referencePath = getReferredToFile(p);
|
||||||
return computeRefFileHDFSBlockDistribution(fs, reference, referencePath);
|
return computeRefFileHDFSBlockDistribution(fs, reference, referencePath);
|
||||||
} else {
|
} else {
|
||||||
|
if (HFileLink.isHFileLink(p)) p = HFileLink.getReferencedPath(fs, p);
|
||||||
FileStatus status = fs.getFileStatus(p);
|
FileStatus status = fs.getFileStatus(p);
|
||||||
long length = status.getLen();
|
long length = status.getLen();
|
||||||
return FSUtils.computeHDFSBlocksDistribution(fs, status, 0, length);
|
return FSUtils.computeHDFSBlocksDistribution(fs, status, 0, length);
|
||||||
|
@ -491,7 +510,12 @@ public class StoreFile extends SchemaConfigured {
|
||||||
this.hdfsBlocksDistribution = computeRefFileHDFSBlockDistribution(
|
this.hdfsBlocksDistribution = computeRefFileHDFSBlockDistribution(
|
||||||
this.fs, this.reference, this.referencePath);
|
this.fs, this.reference, this.referencePath);
|
||||||
} else {
|
} else {
|
||||||
FileStatus status = this.fs.getFileStatus(this.path);
|
FileStatus status;
|
||||||
|
if (isLink()) {
|
||||||
|
status = link.getFileStatus(fs);
|
||||||
|
} else {
|
||||||
|
status = this.fs.getFileStatus(path);
|
||||||
|
}
|
||||||
long length = status.getLen();
|
long length = status.getLen();
|
||||||
this.hdfsBlocksDistribution = FSUtils.computeHDFSBlocksDistribution(
|
this.hdfsBlocksDistribution = FSUtils.computeHDFSBlocksDistribution(
|
||||||
this.fs, status, 0, length);
|
this.fs, status, 0, length);
|
||||||
|
@ -512,6 +536,10 @@ public class StoreFile extends SchemaConfigured {
|
||||||
this.reader = new HalfStoreFileReader(this.fs, this.referencePath,
|
this.reader = new HalfStoreFileReader(this.fs, this.referencePath,
|
||||||
this.cacheConf, this.reference,
|
this.cacheConf, this.reference,
|
||||||
dataBlockEncoder.getEncodingInCache());
|
dataBlockEncoder.getEncodingInCache());
|
||||||
|
} else if (isLink()) {
|
||||||
|
long size = link.getFileStatus(fs).getLen();
|
||||||
|
this.reader = new Reader(this.fs, this.path, link, size, this.cacheConf,
|
||||||
|
dataBlockEncoder.getEncodingInCache(), true);
|
||||||
} else {
|
} else {
|
||||||
this.reader = new Reader(this.fs, this.path, this.cacheConf,
|
this.reader = new Reader(this.fs, this.path, this.cacheConf,
|
||||||
dataBlockEncoder.getEncodingInCache());
|
dataBlockEncoder.getEncodingInCache());
|
||||||
|
@ -888,6 +916,8 @@ public class StoreFile extends SchemaConfigured {
|
||||||
* @return <tt>true</tt> if the file could be a valid store file, <tt>false</tt> otherwise
|
* @return <tt>true</tt> if the file could be a valid store file, <tt>false</tt> otherwise
|
||||||
*/
|
*/
|
||||||
public static boolean validateStoreFileName(String fileName) {
|
public static boolean validateStoreFileName(String fileName) {
|
||||||
|
if (HFileLink.isHFileLink(fileName))
|
||||||
|
return true;
|
||||||
return !fileName.contains("-");
|
return !fileName.contains("-");
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -1277,6 +1307,23 @@ public class StoreFile extends SchemaConfigured {
|
||||||
bloomFilterType = BloomType.NONE;
|
bloomFilterType = BloomType.NONE;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
public Reader(FileSystem fs, Path path, HFileLink hfileLink, long size,
|
||||||
|
CacheConfig cacheConf, DataBlockEncoding preferredEncodingInCache,
|
||||||
|
boolean closeIStream) throws IOException {
|
||||||
|
super(path);
|
||||||
|
|
||||||
|
FSDataInputStream in = hfileLink.open(fs);
|
||||||
|
FSDataInputStream inNoChecksum = in;
|
||||||
|
if (fs instanceof HFileSystem) {
|
||||||
|
FileSystem noChecksumFs = ((HFileSystem)fs).getNoChecksumFs();
|
||||||
|
inNoChecksum = hfileLink.open(noChecksumFs);
|
||||||
|
}
|
||||||
|
|
||||||
|
reader = HFile.createReaderWithEncoding(fs, path, in, inNoChecksum,
|
||||||
|
size, cacheConf, preferredEncodingInCache, closeIStream);
|
||||||
|
bloomFilterType = BloomType.NONE;
|
||||||
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* ONLY USE DEFAULT CONSTRUCTOR FOR UNIT TESTS
|
* ONLY USE DEFAULT CONSTRUCTOR FOR UNIT TESTS
|
||||||
*/
|
*/
|
||||||
|
|
|
@ -1111,6 +1111,25 @@ public abstract class FSUtils {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Given a particular region dir, return all the familydirs inside it
|
||||||
|
*
|
||||||
|
* @param fs A file system for the Path
|
||||||
|
* @param regionDir Path to a specific region directory
|
||||||
|
* @return List of paths to valid family directories in region dir.
|
||||||
|
* @throws IOException
|
||||||
|
*/
|
||||||
|
public static List<Path> getFamilyDirs(final FileSystem fs, final Path regionDir) throws IOException {
|
||||||
|
// assumes we are in a region dir.
|
||||||
|
FileStatus[] fds = fs.listStatus(regionDir, new FamilyDirFilter(fs));
|
||||||
|
List<Path> familyDirs = new ArrayList<Path>(fds.length);
|
||||||
|
for (FileStatus fdfs: fds) {
|
||||||
|
Path fdPath = fdfs.getPath();
|
||||||
|
familyDirs.add(fdPath);
|
||||||
|
}
|
||||||
|
return familyDirs;
|
||||||
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Filter for HFiles that excludes reference files.
|
* Filter for HFiles that excludes reference files.
|
||||||
*/
|
*/
|
||||||
|
@ -1209,7 +1228,7 @@ public abstract class FSUtils {
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Calls fs.listStatus() and treats FileNotFoundException as non-fatal
|
* Calls fs.listStatus() and treats FileNotFoundException as non-fatal
|
||||||
* This would accommodate difference in various hadoop versions
|
* This accommodates differences between hadoop versions
|
||||||
*
|
*
|
||||||
* @param fs file system
|
* @param fs file system
|
||||||
* @param dir directory
|
* @param dir directory
|
||||||
|
@ -1228,7 +1247,19 @@ public abstract class FSUtils {
|
||||||
if (status == null || status.length < 1) return null;
|
if (status == null || status.length < 1) return null;
|
||||||
return status;
|
return status;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Calls fs.listStatus() and treats FileNotFoundException as non-fatal
|
||||||
|
* This would accommodates differences between hadoop versions
|
||||||
|
*
|
||||||
|
* @param fs file system
|
||||||
|
* @param dir directory
|
||||||
|
* @return null if tabledir doesn't exist, otherwise FileStatus array
|
||||||
|
*/
|
||||||
|
public static FileStatus[] listStatus(final FileSystem fs, final Path dir) throws IOException {
|
||||||
|
return listStatus(fs, dir, null);
|
||||||
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Calls fs.delete() and returns the value returned by the fs.delete()
|
* Calls fs.delete() and returns the value returned by the fs.delete()
|
||||||
*
|
*
|
||||||
|
|
|
@ -39,6 +39,21 @@ public class HFileArchiveUtil {
|
||||||
// non-external instantiation - util class
|
// non-external instantiation - util class
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Get the directory to archive a store directory
|
||||||
|
* @param conf {@link Configuration} to read for the archive directory name
|
||||||
|
* @param tableName table name under which the store currently lives
|
||||||
|
* @param regionName region encoded name under which the store currently lives
|
||||||
|
* @param family name of the family in the store
|
||||||
|
* @return {@link Path} to the directory to archive the given store or
|
||||||
|
* <tt>null</tt> if it should not be archived
|
||||||
|
*/
|
||||||
|
public static Path getStoreArchivePath(final Configuration conf, final String tableName,
|
||||||
|
final String regionName, final String familyName) throws IOException {
|
||||||
|
Path tableArchiveDir = getTableArchivePath(conf, tableName);
|
||||||
|
return HStore.getStoreHomedir(tableArchiveDir, regionName, familyName);
|
||||||
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Get the directory to archive a store directory
|
* Get the directory to archive a store directory
|
||||||
* @param conf {@link Configuration} to read for the archive directory name
|
* @param conf {@link Configuration} to read for the archive directory name
|
||||||
|
@ -103,6 +118,19 @@ public class HFileArchiveUtil {
|
||||||
archiveName), tabledir.getName());
|
archiveName), tabledir.getName());
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Get the path to the table archive directory based on the configured archive directory.
|
||||||
|
* <p>
|
||||||
|
* Assumed that the table should already be archived.
|
||||||
|
* @param conf {@link Configuration} to read the archive directory property. Can be null
|
||||||
|
* @param tableName Name of the table to be archived. Cannot be null.
|
||||||
|
* @return {@link Path} to the archive directory for the table
|
||||||
|
*/
|
||||||
|
public static Path getTableArchivePath(final Configuration conf, final String tableName)
|
||||||
|
throws IOException {
|
||||||
|
return new Path(getArchivePath(conf), tableName);
|
||||||
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Get the archive directory as per the configuration
|
* Get the archive directory as per the configuration
|
||||||
* @param conf {@link Configuration} to read the archive directory from (can be null, in which
|
* @param conf {@link Configuration} to read the archive directory from (can be null, in which
|
||||||
|
|
|
@ -0,0 +1,244 @@
|
||||||
|
/**
|
||||||
|
* Licensed to the Apache Software Foundation (ASF) under one
|
||||||
|
* or more contributor license agreements. See the NOTICE file
|
||||||
|
* distributed with this work for additional information
|
||||||
|
* regarding copyright ownership. The ASF licenses this file
|
||||||
|
* to you under the Apache License, Version 2.0 (the
|
||||||
|
* "License"); you may not use this file except in compliance
|
||||||
|
* with the License. You may obtain a copy of the License at
|
||||||
|
*
|
||||||
|
* http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
*
|
||||||
|
* Unless required by applicable law or agreed to in writing, software
|
||||||
|
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||||
|
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||||
|
* See the License for the specific language governing permissions and
|
||||||
|
* limitations under the License.
|
||||||
|
*/
|
||||||
|
|
||||||
|
package org.apache.hadoop.hbase.io;
|
||||||
|
|
||||||
|
import static org.junit.Assert.assertEquals;
|
||||||
|
import static org.junit.Assert.assertFalse;
|
||||||
|
import static org.junit.Assert.assertTrue;
|
||||||
|
|
||||||
|
import org.junit.Test;
|
||||||
|
import org.junit.AfterClass;
|
||||||
|
import org.junit.BeforeClass;
|
||||||
|
import junit.framework.TestCase;
|
||||||
|
import org.junit.experimental.categories.Category;
|
||||||
|
|
||||||
|
import org.apache.hadoop.conf.Configuration;
|
||||||
|
import org.apache.hadoop.fs.FileSystem;
|
||||||
|
import org.apache.hadoop.fs.Path;
|
||||||
|
import org.apache.hadoop.fs.FSDataInputStream;
|
||||||
|
import org.apache.hadoop.fs.FSDataOutputStream;
|
||||||
|
import org.apache.hadoop.hdfs.MiniDFSCluster;
|
||||||
|
import org.apache.hadoop.hbase.HBaseTestingUtility;
|
||||||
|
import org.apache.hadoop.hbase.MediumTests;
|
||||||
|
import org.apache.hadoop.hbase.io.FileLink;
|
||||||
|
|
||||||
|
import java.io.FileNotFoundException;
|
||||||
|
import java.io.IOException;
|
||||||
|
import java.util.ArrayList;
|
||||||
|
import java.util.List;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Test that FileLink switches between alternate locations
|
||||||
|
* when the current location moves or gets deleted.
|
||||||
|
*/
|
||||||
|
@Category(MediumTests.class)
|
||||||
|
public class TestFileLink {
|
||||||
|
/**
|
||||||
|
* Test, on HDFS, that the FileLink is still readable
|
||||||
|
* even when the current file gets renamed.
|
||||||
|
*/
|
||||||
|
@Test
|
||||||
|
public void testHDFSLinkReadDuringRename() throws Exception {
|
||||||
|
HBaseTestingUtility testUtil = new HBaseTestingUtility();
|
||||||
|
Configuration conf = testUtil.getConfiguration();
|
||||||
|
conf.setInt("dfs.blocksize", 1024 * 1024);
|
||||||
|
conf.setInt("dfs.client.read.prefetch.size", 2 * 1024 * 1024);
|
||||||
|
|
||||||
|
testUtil.startMiniDFSCluster(1);
|
||||||
|
MiniDFSCluster cluster = testUtil.getDFSCluster();
|
||||||
|
FileSystem fs = cluster.getFileSystem();
|
||||||
|
assertEquals("hdfs", fs.getUri().getScheme());
|
||||||
|
|
||||||
|
try {
|
||||||
|
testLinkReadDuringRename(fs, testUtil.getDefaultRootDirPath());
|
||||||
|
} finally {
|
||||||
|
testUtil.shutdownMiniCluster();
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Test, on a local filesystem, that the FileLink is still readable
|
||||||
|
* even when the current file gets renamed.
|
||||||
|
*/
|
||||||
|
@Test
|
||||||
|
public void testLocalLinkReadDuringRename() throws IOException {
|
||||||
|
HBaseTestingUtility testUtil = new HBaseTestingUtility();
|
||||||
|
FileSystem fs = testUtil.getTestFileSystem();
|
||||||
|
assertEquals("file", fs.getUri().getScheme());
|
||||||
|
testLinkReadDuringRename(fs, testUtil.getDataTestDir());
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Test that link is still readable even when the current file gets renamed.
|
||||||
|
*/
|
||||||
|
private void testLinkReadDuringRename(FileSystem fs, Path rootDir) throws IOException {
|
||||||
|
Path originalPath = new Path(rootDir, "test.file");
|
||||||
|
Path archivedPath = new Path(rootDir, "archived.file");
|
||||||
|
|
||||||
|
writeSomeData(fs, originalPath, 256 << 20, (byte)2);
|
||||||
|
|
||||||
|
List<Path> files = new ArrayList<Path>();
|
||||||
|
files.add(originalPath);
|
||||||
|
files.add(archivedPath);
|
||||||
|
|
||||||
|
FileLink link = new FileLink(files);
|
||||||
|
FSDataInputStream in = link.open(fs);
|
||||||
|
try {
|
||||||
|
byte[] data = new byte[8192];
|
||||||
|
long size = 0;
|
||||||
|
|
||||||
|
// Read from origin
|
||||||
|
int n = in.read(data);
|
||||||
|
dataVerify(data, n, (byte)2);
|
||||||
|
size += n;
|
||||||
|
|
||||||
|
// Move origin to archive
|
||||||
|
assertFalse(fs.exists(archivedPath));
|
||||||
|
fs.rename(originalPath, archivedPath);
|
||||||
|
assertFalse(fs.exists(originalPath));
|
||||||
|
assertTrue(fs.exists(archivedPath));
|
||||||
|
|
||||||
|
// Try to read to the end
|
||||||
|
while ((n = in.read(data)) > 0) {
|
||||||
|
dataVerify(data, n, (byte)2);
|
||||||
|
size += n;
|
||||||
|
}
|
||||||
|
|
||||||
|
assertEquals(256 << 20, size);
|
||||||
|
} finally {
|
||||||
|
in.close();
|
||||||
|
if (fs.exists(originalPath)) fs.delete(originalPath);
|
||||||
|
if (fs.exists(archivedPath)) fs.delete(archivedPath);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Test that link is still readable even when the current file gets deleted.
|
||||||
|
*
|
||||||
|
* NOTE: This test is valid only on HDFS.
|
||||||
|
* When a file is deleted from a local file-system, it is simply 'unlinked'.
|
||||||
|
* The inode, which contains the file's data, is not deleted until all
|
||||||
|
* processes have finished with it.
|
||||||
|
* In HDFS when the request exceed the cached block locations,
|
||||||
|
* a query to the namenode is performed, using the filename,
|
||||||
|
* and the deleted file doesn't exists anymore (FileNotFoundException).
|
||||||
|
*/
|
||||||
|
@Test
|
||||||
|
public void testHDFSLinkReadDuringDelete() throws Exception {
|
||||||
|
HBaseTestingUtility testUtil = new HBaseTestingUtility();
|
||||||
|
Configuration conf = testUtil.getConfiguration();
|
||||||
|
conf.setInt("dfs.blocksize", 1024 * 1024);
|
||||||
|
conf.setInt("dfs.client.read.prefetch.size", 2 * 1024 * 1024);
|
||||||
|
|
||||||
|
testUtil.startMiniDFSCluster(1);
|
||||||
|
MiniDFSCluster cluster = testUtil.getDFSCluster();
|
||||||
|
FileSystem fs = cluster.getFileSystem();
|
||||||
|
assertEquals("hdfs", fs.getUri().getScheme());
|
||||||
|
|
||||||
|
try {
|
||||||
|
List<Path> files = new ArrayList<Path>();
|
||||||
|
for (int i = 0; i < 3; i++) {
|
||||||
|
Path path = new Path(String.format("test-data-%d", i));
|
||||||
|
writeSomeData(fs, path, 1 << 20, (byte)i);
|
||||||
|
files.add(path);
|
||||||
|
}
|
||||||
|
|
||||||
|
FileLink link = new FileLink(files);
|
||||||
|
FSDataInputStream in = link.open(fs);
|
||||||
|
try {
|
||||||
|
byte[] data = new byte[8192];
|
||||||
|
int n;
|
||||||
|
|
||||||
|
// Switch to file 1
|
||||||
|
n = in.read(data);
|
||||||
|
dataVerify(data, n, (byte)0);
|
||||||
|
fs.delete(files.get(0));
|
||||||
|
skipBuffer(in, (byte)0);
|
||||||
|
|
||||||
|
// Switch to file 2
|
||||||
|
n = in.read(data);
|
||||||
|
dataVerify(data, n, (byte)1);
|
||||||
|
fs.delete(files.get(1));
|
||||||
|
skipBuffer(in, (byte)1);
|
||||||
|
|
||||||
|
// Switch to file 3
|
||||||
|
n = in.read(data);
|
||||||
|
dataVerify(data, n, (byte)2);
|
||||||
|
fs.delete(files.get(2));
|
||||||
|
skipBuffer(in, (byte)2);
|
||||||
|
|
||||||
|
// No more files available
|
||||||
|
try {
|
||||||
|
n = in.read(data);
|
||||||
|
assert(n <= 0);
|
||||||
|
} catch (FileNotFoundException e) {
|
||||||
|
assertTrue(true);
|
||||||
|
}
|
||||||
|
} finally {
|
||||||
|
in.close();
|
||||||
|
}
|
||||||
|
} finally {
|
||||||
|
testUtil.shutdownMiniCluster();
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Write up to 'size' bytes with value 'v' into a new file called 'path'.
|
||||||
|
*/
|
||||||
|
private void writeSomeData (FileSystem fs, Path path, long size, byte v) throws IOException {
|
||||||
|
byte[] data = new byte[4096];
|
||||||
|
for (int i = 0; i < data.length; i++) {
|
||||||
|
data[i] = v;
|
||||||
|
}
|
||||||
|
|
||||||
|
FSDataOutputStream stream = fs.create(path);
|
||||||
|
try {
|
||||||
|
long written = 0;
|
||||||
|
while (written < size) {
|
||||||
|
stream.write(data, 0, data.length);
|
||||||
|
written += data.length;
|
||||||
|
}
|
||||||
|
} finally {
|
||||||
|
stream.close();
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Verify that all bytes in 'data' have 'v' as value.
|
||||||
|
*/
|
||||||
|
private static void dataVerify(byte[] data, int n, byte v) {
|
||||||
|
for (int i = 0; i < n; ++i) {
|
||||||
|
assertEquals(v, data[i]);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
private static void skipBuffer(FSDataInputStream in, byte v) throws IOException {
|
||||||
|
byte[] data = new byte[8192];
|
||||||
|
try {
|
||||||
|
int n;
|
||||||
|
while ((n = in.read(data)) == data.length) {
|
||||||
|
for (int i = 0; i < data.length; ++i) {
|
||||||
|
if (data[i] != v)
|
||||||
|
throw new Exception("File changed");
|
||||||
|
}
|
||||||
|
}
|
||||||
|
} catch (Exception e) {
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
|
@ -0,0 +1,178 @@
|
||||||
|
/**
|
||||||
|
* Licensed to the Apache Software Foundation (ASF) under one
|
||||||
|
* or more contributor license agreements. See the NOTICE file
|
||||||
|
* distributed with this work for additional information
|
||||||
|
* regarding copyright ownership. The ASF licenses this file
|
||||||
|
* to you under the Apache License, Version 2.0 (the
|
||||||
|
* "License"); you may not use this file except in compliance
|
||||||
|
* with the License. You may obtain a copy of the License at
|
||||||
|
*
|
||||||
|
* http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
*
|
||||||
|
* Unless required by applicable law or agreed to in writing, software
|
||||||
|
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||||
|
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||||
|
* See the License for the specific language governing permissions and
|
||||||
|
* limitations under the License.
|
||||||
|
*/
|
||||||
|
package org.apache.hadoop.hbase.master.cleaner;
|
||||||
|
|
||||||
|
import static org.junit.Assert.assertEquals;
|
||||||
|
import static org.junit.Assert.assertFalse;
|
||||||
|
import static org.junit.Assert.assertTrue;
|
||||||
|
|
||||||
|
import java.io.IOException;
|
||||||
|
|
||||||
|
import org.apache.hadoop.conf.Configuration;
|
||||||
|
import org.apache.hadoop.fs.FileStatus;
|
||||||
|
import org.apache.hadoop.fs.FileSystem;
|
||||||
|
import org.apache.hadoop.fs.Path;
|
||||||
|
import org.apache.hadoop.hbase.HBaseTestingUtility;
|
||||||
|
import org.apache.hadoop.hbase.HConstants;
|
||||||
|
import org.apache.hadoop.hbase.Server;
|
||||||
|
import org.apache.hadoop.hbase.ServerName;
|
||||||
|
import org.apache.hadoop.hbase.SmallTests;
|
||||||
|
import org.apache.hadoop.hbase.backup.HFileArchiver;
|
||||||
|
import org.apache.hadoop.hbase.catalog.CatalogTracker;
|
||||||
|
import org.apache.hadoop.hbase.HRegionInfo;
|
||||||
|
import org.apache.hadoop.hbase.io.HFileLink;
|
||||||
|
import org.apache.hadoop.hbase.util.Bytes;
|
||||||
|
import org.apache.hadoop.hbase.util.FSUtils;
|
||||||
|
import org.apache.hadoop.hbase.util.HFileArchiveUtil;
|
||||||
|
import org.apache.hadoop.hbase.zookeeper.ZooKeeperWatcher;
|
||||||
|
import org.junit.Test;
|
||||||
|
import org.junit.experimental.categories.Category;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Test the HFileLink Cleaner.
|
||||||
|
* HFiles with links cannot be deleted until a link is present.
|
||||||
|
*/
|
||||||
|
@Category(SmallTests.class)
|
||||||
|
public class TestHFileLinkCleaner {
|
||||||
|
|
||||||
|
private final static HBaseTestingUtility TEST_UTIL = new HBaseTestingUtility();
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void testHFileLinkCleaning() throws Exception {
|
||||||
|
Configuration conf = TEST_UTIL.getConfiguration();
|
||||||
|
conf.set(HConstants.HBASE_DIR, TEST_UTIL.getDataTestDir().toString());
|
||||||
|
conf.set(HFileCleaner.MASTER_HFILE_CLEANER_PLUGINS,
|
||||||
|
"org.apache.hadoop.hbase.master.cleaner.TimeToLiveHFileCleaner," +
|
||||||
|
"org.apache.hadoop.hbase.master.cleaner.HFileLinkCleaner");
|
||||||
|
Path rootDir = FSUtils.getRootDir(conf);
|
||||||
|
FileSystem fs = FileSystem.get(conf);
|
||||||
|
|
||||||
|
final String tableName = "test-table";
|
||||||
|
final String tableLinkName = "test-link";
|
||||||
|
final String hfileName = "1234567890";
|
||||||
|
final String familyName = "cf";
|
||||||
|
|
||||||
|
HRegionInfo hri = new HRegionInfo(Bytes.toBytes(tableName));
|
||||||
|
HRegionInfo hriLink = new HRegionInfo(Bytes.toBytes(tableLinkName));
|
||||||
|
|
||||||
|
Path archiveDir = HFileArchiveUtil.getArchivePath(conf);
|
||||||
|
Path archiveStoreDir = HFileArchiveUtil.getStoreArchivePath(conf,
|
||||||
|
tableName, hri.getEncodedName(), familyName);
|
||||||
|
Path archiveLinkStoreDir = HFileArchiveUtil.getStoreArchivePath(conf,
|
||||||
|
tableLinkName, hriLink.getEncodedName(), familyName);
|
||||||
|
|
||||||
|
// Create hfile /hbase/table-link/region/cf/getEncodedName.HFILE(conf);
|
||||||
|
Path familyPath = getFamilyDirPath(archiveDir, tableName, hri.getEncodedName(), familyName);
|
||||||
|
fs.mkdirs(familyPath);
|
||||||
|
Path hfilePath = new Path(familyPath, hfileName);
|
||||||
|
fs.createNewFile(hfilePath);
|
||||||
|
|
||||||
|
// Create link to hfile
|
||||||
|
Path familyLinkPath = getFamilyDirPath(rootDir, tableLinkName,
|
||||||
|
hriLink.getEncodedName(), familyName);
|
||||||
|
fs.mkdirs(familyLinkPath);
|
||||||
|
HFileLink.create(conf, fs, familyLinkPath, hri, hfileName);
|
||||||
|
Path linkBackRefDir = HFileLink.getBackReferencesDir(archiveStoreDir, hfileName);
|
||||||
|
assertTrue(fs.exists(linkBackRefDir));
|
||||||
|
FileStatus[] backRefs = fs.listStatus(linkBackRefDir);
|
||||||
|
assertEquals(1, backRefs.length);
|
||||||
|
Path linkBackRef = backRefs[0].getPath();
|
||||||
|
|
||||||
|
// Initialize cleaner
|
||||||
|
final long ttl = 1000;
|
||||||
|
conf.setLong(TimeToLiveHFileCleaner.TTL_CONF_KEY, ttl);
|
||||||
|
Server server = new DummyServer();
|
||||||
|
HFileCleaner cleaner = new HFileCleaner(1000, server, conf, fs, archiveDir);
|
||||||
|
|
||||||
|
// Link backref cannot be removed
|
||||||
|
Thread.sleep(ttl * 2);
|
||||||
|
cleaner.chore();
|
||||||
|
assertTrue(fs.exists(linkBackRef));
|
||||||
|
assertTrue(fs.exists(hfilePath));
|
||||||
|
|
||||||
|
// Link backref can be removed
|
||||||
|
fs.rename(new Path(rootDir, tableLinkName), new Path(archiveDir, tableLinkName));
|
||||||
|
Thread.sleep(ttl * 2);
|
||||||
|
cleaner.chore();
|
||||||
|
assertFalse("Link should be deleted", fs.exists(linkBackRef));
|
||||||
|
|
||||||
|
// HFile can be removed
|
||||||
|
Thread.sleep(ttl * 2);
|
||||||
|
cleaner.chore();
|
||||||
|
assertFalse("HFile should be deleted", fs.exists(hfilePath));
|
||||||
|
|
||||||
|
// Remove everything
|
||||||
|
for (int i = 0; i < 4; ++i) {
|
||||||
|
Thread.sleep(ttl * 2);
|
||||||
|
cleaner.chore();
|
||||||
|
}
|
||||||
|
assertFalse("HFile should be deleted", fs.exists(new Path(archiveDir, tableName)));
|
||||||
|
assertFalse("Link should be deleted", fs.exists(new Path(archiveDir, tableLinkName)));
|
||||||
|
|
||||||
|
cleaner.interrupt();
|
||||||
|
}
|
||||||
|
|
||||||
|
private static Path getFamilyDirPath (final Path rootDir, final String table,
|
||||||
|
final String region, final String family) {
|
||||||
|
return new Path(new Path(new Path(rootDir, table), region), family);
|
||||||
|
}
|
||||||
|
|
||||||
|
static class DummyServer implements Server {
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public Configuration getConfiguration() {
|
||||||
|
return TEST_UTIL.getConfiguration();
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public ZooKeeperWatcher getZooKeeper() {
|
||||||
|
try {
|
||||||
|
return new ZooKeeperWatcher(getConfiguration(), "dummy server", this);
|
||||||
|
} catch (IOException e) {
|
||||||
|
e.printStackTrace();
|
||||||
|
}
|
||||||
|
return null;
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public CatalogTracker getCatalogTracker() {
|
||||||
|
return null;
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public ServerName getServerName() {
|
||||||
|
return new ServerName("regionserver,60020,000000");
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public void abort(String why, Throwable e) {}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public boolean isAborted() {
|
||||||
|
return false;
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public void stop(String why) {}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public boolean isStopped() {
|
||||||
|
return false;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
|
@ -35,9 +35,11 @@ import org.apache.hadoop.fs.FileSystem;
|
||||||
import org.apache.hadoop.fs.Path;
|
import org.apache.hadoop.fs.Path;
|
||||||
import org.apache.hadoop.hbase.HBaseTestCase;
|
import org.apache.hadoop.hbase.HBaseTestCase;
|
||||||
import org.apache.hadoop.hbase.HBaseTestingUtility;
|
import org.apache.hadoop.hbase.HBaseTestingUtility;
|
||||||
|
import org.apache.hadoop.hbase.HRegionInfo;
|
||||||
import org.apache.hadoop.hbase.KeyValue;
|
import org.apache.hadoop.hbase.KeyValue;
|
||||||
import org.apache.hadoop.hbase.SmallTests;
|
import org.apache.hadoop.hbase.SmallTests;
|
||||||
import org.apache.hadoop.hbase.client.Scan;
|
import org.apache.hadoop.hbase.client.Scan;
|
||||||
|
import org.apache.hadoop.hbase.io.HFileLink;
|
||||||
import org.apache.hadoop.hbase.io.encoding.DataBlockEncoding;
|
import org.apache.hadoop.hbase.io.encoding.DataBlockEncoding;
|
||||||
import org.apache.hadoop.hbase.io.hfile.BlockCache;
|
import org.apache.hadoop.hbase.io.hfile.BlockCache;
|
||||||
import org.apache.hadoop.hbase.io.hfile.CacheConfig;
|
import org.apache.hadoop.hbase.io.hfile.CacheConfig;
|
||||||
|
@ -52,6 +54,7 @@ import org.apache.hadoop.hbase.regionserver.metrics.SchemaMetrics;
|
||||||
import org.apache.hadoop.hbase.util.BloomFilterFactory;
|
import org.apache.hadoop.hbase.util.BloomFilterFactory;
|
||||||
import org.apache.hadoop.hbase.util.Bytes;
|
import org.apache.hadoop.hbase.util.Bytes;
|
||||||
import org.apache.hadoop.hbase.util.ChecksumType;
|
import org.apache.hadoop.hbase.util.ChecksumType;
|
||||||
|
import org.apache.hadoop.hbase.util.FSUtils;
|
||||||
import org.junit.experimental.categories.Category;
|
import org.junit.experimental.categories.Category;
|
||||||
import org.mockito.Mockito;
|
import org.mockito.Mockito;
|
||||||
|
|
||||||
|
@ -171,6 +174,40 @@ public class TestStoreFile extends HBaseTestCase {
|
||||||
assertTrue(Bytes.equals(kv.getRow(), finalRow));
|
assertTrue(Bytes.equals(kv.getRow(), finalRow));
|
||||||
}
|
}
|
||||||
|
|
||||||
|
public void testHFileLink() throws IOException {
|
||||||
|
HRegionInfo hri = new HRegionInfo(Bytes.toBytes("table-link"));
|
||||||
|
Path storedir = new Path(new Path(FSUtils.getRootDir(conf),
|
||||||
|
new Path(hri.getTableNameAsString(), hri.getEncodedName())), "cf");
|
||||||
|
|
||||||
|
// Make a store file and write data to it.
|
||||||
|
StoreFile.Writer writer = new StoreFile.WriterBuilder(conf, cacheConf,
|
||||||
|
this.fs, 8 * 1024)
|
||||||
|
.withOutputDir(storedir)
|
||||||
|
.build();
|
||||||
|
Path storeFilePath = writer.getPath();
|
||||||
|
writeStoreFile(writer);
|
||||||
|
writer.close();
|
||||||
|
|
||||||
|
Path dstPath = new Path(FSUtils.getRootDir(conf), new Path("test-region", "cf"));
|
||||||
|
HFileLink.create(conf, this.fs, dstPath, hri, storeFilePath.getName());
|
||||||
|
Path linkFilePath = new Path(dstPath,
|
||||||
|
HFileLink.createHFileLinkName(hri, storeFilePath.getName()));
|
||||||
|
|
||||||
|
// Try to open store file from link
|
||||||
|
StoreFile hsf = new StoreFile(this.fs, linkFilePath, conf, cacheConf,
|
||||||
|
StoreFile.BloomType.NONE, NoOpDataBlockEncoder.INSTANCE);
|
||||||
|
assertTrue(hsf.isLink());
|
||||||
|
|
||||||
|
// Now confirm that I can read from the link
|
||||||
|
int count = 1;
|
||||||
|
HFileScanner s = hsf.createReader().getScanner(false, false);
|
||||||
|
s.seekTo();
|
||||||
|
while (s.next()) {
|
||||||
|
count++;
|
||||||
|
}
|
||||||
|
assertEquals((LAST_CHAR - FIRST_CHAR + 1) * (LAST_CHAR - FIRST_CHAR + 1), count);
|
||||||
|
}
|
||||||
|
|
||||||
private void checkHalfHFile(final StoreFile f)
|
private void checkHalfHFile(final StoreFile f)
|
||||||
throws IOException {
|
throws IOException {
|
||||||
byte [] midkey = f.createReader().midkey();
|
byte [] midkey = f.createReader().midkey();
|
||||||
|
|
Loading…
Reference in New Issue