HBASE-12550 Check all storefiles are referenced before splitting
Summary: If there are bugs in HDFS move and/or create we should protect against them by making sure that all files referenced end up in split daughters. Test Plan: Unit tests cover splits pretty well Subscribers: matteobertozzi Differential Revision: https://reviews.facebook.net/D29373
This commit is contained in:
parent
336c22d581
commit
0df5ed2ca6
|
@ -4805,11 +4805,12 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver { //
|
|||
/**
|
||||
* Create a daughter region from given a temp directory with the region data.
|
||||
* @param hri Spec. for daughter region to open.
|
||||
* @param expectedReferenceFileCount
|
||||
* @throws IOException
|
||||
*/
|
||||
HRegion createDaughterRegionFromSplits(final HRegionInfo hri) throws IOException {
|
||||
HRegion createDaughterRegionFromSplits(final HRegionInfo hri, int expectedReferenceFileCount) throws IOException {
|
||||
// Move the files from the temporary .splits to the final /table/region directory
|
||||
fs.commitDaughterRegion(hri);
|
||||
fs.commitDaughterRegion(hri, expectedReferenceFileCount);
|
||||
|
||||
// Create the daughter HRegion instance
|
||||
HRegion r = HRegion.newHRegion(this.fs.getTableDir(), this.getWAL(), fs.getFileSystem(),
|
||||
|
|
|
@ -38,7 +38,6 @@ import org.apache.hadoop.fs.FileStatus;
|
|||
import org.apache.hadoop.fs.FileSystem;
|
||||
import org.apache.hadoop.fs.FileUtil;
|
||||
import org.apache.hadoop.fs.Path;
|
||||
import org.apache.hadoop.fs.PathFilter;
|
||||
import org.apache.hadoop.fs.permission.FsPermission;
|
||||
import org.apache.hadoop.hbase.HColumnDescriptor;
|
||||
import org.apache.hadoop.hbase.HConstants;
|
||||
|
@ -247,13 +246,7 @@ public class HRegionFileSystem {
|
|||
*/
|
||||
public boolean hasReferences(final String familyName) throws IOException {
|
||||
FileStatus[] files = FSUtils.listStatus(fs, getStoreDir(familyName),
|
||||
new PathFilter () {
|
||||
@Override
|
||||
public boolean accept(Path path) {
|
||||
return StoreFileInfo.isReference(path);
|
||||
}
|
||||
}
|
||||
);
|
||||
new FSUtils.ReferenceFileFilter(fs));
|
||||
return files != null && files.length > 0;
|
||||
}
|
||||
|
||||
|
@ -523,13 +516,19 @@ public class HRegionFileSystem {
|
|||
/**
|
||||
* Commit a daughter region, moving it from the split temporary directory
|
||||
* to the proper location in the filesystem.
|
||||
* @param regionInfo daughter {@link HRegionInfo}
|
||||
*
|
||||
* @param regionInfo daughter {@link org.apache.hadoop.hbase.HRegionInfo}
|
||||
* @param expectedReferenceFileCount number of expected reference files to have created and to
|
||||
* move into the new location.
|
||||
* @throws IOException
|
||||
*/
|
||||
Path commitDaughterRegion(final HRegionInfo regionInfo) throws IOException {
|
||||
Path commitDaughterRegion(final HRegionInfo regionInfo, int expectedReferenceFileCount)
|
||||
throws IOException {
|
||||
Path regionDir = new Path(this.tableDir, regionInfo.getEncodedName());
|
||||
Path daughterTmpDir = this.getSplitsDir(regionInfo);
|
||||
|
||||
if (fs.exists(daughterTmpDir)) {
|
||||
|
||||
// Write HRI to a file in case we need to recover hbase:meta
|
||||
Path regionInfoFile = new Path(daughterTmpDir, REGION_INFO_FILE);
|
||||
byte[] regionInfoContent = getRegionInfoFileContent(regionInfo);
|
||||
|
@ -540,6 +539,7 @@ public class HRegionFileSystem {
|
|||
throw new IOException("Unable to rename " + daughterTmpDir + " to " + regionDir);
|
||||
}
|
||||
}
|
||||
|
||||
return regionDir;
|
||||
}
|
||||
|
||||
|
|
|
@ -34,6 +34,7 @@ import java.util.concurrent.TimeUnit;
|
|||
|
||||
import org.apache.commons.logging.Log;
|
||||
import org.apache.commons.logging.LogFactory;
|
||||
import org.apache.hadoop.fs.Path;
|
||||
import org.apache.hadoop.hbase.classification.InterfaceAudience;
|
||||
import org.apache.hadoop.hbase.HConstants;
|
||||
import org.apache.hadoop.hbase.HRegionInfo;
|
||||
|
@ -45,7 +46,9 @@ import org.apache.hadoop.hbase.protobuf.generated.RegionServerStatusProtos.Regio
|
|||
import org.apache.hadoop.hbase.util.Bytes;
|
||||
import org.apache.hadoop.hbase.util.CancelableProgressable;
|
||||
import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
|
||||
import org.apache.hadoop.hbase.util.FSUtils;
|
||||
import org.apache.hadoop.hbase.util.HasThread;
|
||||
import org.apache.hadoop.hbase.util.Pair;
|
||||
import org.apache.hadoop.hbase.util.PairOfSameType;
|
||||
import org.apache.zookeeper.KeeperException;
|
||||
|
||||
|
@ -162,8 +165,8 @@ public class SplitTransaction {
|
|||
}
|
||||
|
||||
static class JournalEntry {
|
||||
public JournalEntryType type;
|
||||
public long timestamp;
|
||||
private JournalEntryType type;
|
||||
private long timestamp;
|
||||
|
||||
public JournalEntry(JournalEntryType type) {
|
||||
this(type, EnvironmentEdgeManager.currentTime());
|
||||
|
@ -380,21 +383,40 @@ public class SplitTransaction {
|
|||
// splitStoreFiles creates daughter region dirs under the parent splits dir
|
||||
// Nothing to unroll here if failure -- clean up of CREATE_SPLIT_DIR will
|
||||
// clean this up.
|
||||
splitStoreFiles(hstoreFilesToSplit);
|
||||
Pair<Integer, Integer> expectedReferences = splitStoreFiles(hstoreFilesToSplit);
|
||||
|
||||
// Log to the journal that we are creating region A, the first daughter
|
||||
// region. We could fail halfway through. If we do, we could have left
|
||||
// stuff in fs that needs cleanup -- a storefile or two. Thats why we
|
||||
// add entry to journal BEFORE rather than AFTER the change.
|
||||
this.journal.add(new JournalEntry(JournalEntryType.STARTED_REGION_A_CREATION));
|
||||
HRegion a = this.parent.createDaughterRegionFromSplits(this.hri_a);
|
||||
assertReferenceFileCount(expectedReferences.getFirst(),
|
||||
this.parent.getRegionFileSystem().getSplitsDir(this.hri_a));
|
||||
HRegion a = this.parent.createDaughterRegionFromSplits(this.hri_a,
|
||||
expectedReferences.getFirst());
|
||||
assertReferenceFileCount(expectedReferences.getFirst(),
|
||||
new Path(this.parent.getRegionFileSystem().getTableDir(), this.hri_a.getEncodedName()));
|
||||
|
||||
// Ditto
|
||||
this.journal.add(new JournalEntry(JournalEntryType.STARTED_REGION_B_CREATION));
|
||||
HRegion b = this.parent.createDaughterRegionFromSplits(this.hri_b);
|
||||
assertReferenceFileCount(expectedReferences.getSecond(),
|
||||
this.parent.getRegionFileSystem().getSplitsDir(this.hri_b));
|
||||
HRegion b = this.parent.createDaughterRegionFromSplits(this.hri_b,
|
||||
expectedReferences.getSecond());
|
||||
assertReferenceFileCount(expectedReferences.getSecond(),
|
||||
new Path(this.parent.getRegionFileSystem().getTableDir(), this.hri_b.getEncodedName()));
|
||||
|
||||
return new PairOfSameType<HRegion>(a, b);
|
||||
}
|
||||
|
||||
void assertReferenceFileCount(int expectedReferenceFileCount, Path dir)
|
||||
throws IOException {
|
||||
if (expectedReferenceFileCount != 0 &&
|
||||
expectedReferenceFileCount != FSUtils.getRegionReferenceFileCount(this.parent.getFilesystem(), dir)) {
|
||||
throw new IOException("Failing split. Expected reference file count isn't equal.");
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Perform time consuming opening of the daughter regions.
|
||||
* @param server Hosting server instance. Can be null when testing
|
||||
|
@ -570,7 +592,14 @@ public class SplitTransaction {
|
|||
}
|
||||
}
|
||||
|
||||
private void splitStoreFiles(final Map<byte[], List<StoreFile>> hstoreFilesToSplit)
|
||||
/**
|
||||
* Creates reference files for top and bottom half of the
|
||||
* @param hstoreFilesToSplit map of store files to create half file references for.
|
||||
* @return the number of reference files that were created.
|
||||
* @throws IOException
|
||||
*/
|
||||
private Pair<Integer, Integer> splitStoreFiles(
|
||||
final Map<byte[], List<StoreFile>> hstoreFilesToSplit)
|
||||
throws IOException {
|
||||
if (hstoreFilesToSplit == null) {
|
||||
// Could be null because close didn't succeed -- for now consider it fatal
|
||||
|
@ -582,14 +611,14 @@ public class SplitTransaction {
|
|||
int nbFiles = hstoreFilesToSplit.size();
|
||||
if (nbFiles == 0) {
|
||||
// no file needs to be splitted.
|
||||
return;
|
||||
return new Pair<Integer, Integer>(0,0);
|
||||
}
|
||||
ThreadFactoryBuilder builder = new ThreadFactoryBuilder();
|
||||
builder.setNameFormat("StoreFileSplitter-%1$d");
|
||||
ThreadFactory factory = builder.build();
|
||||
ThreadPoolExecutor threadPool =
|
||||
(ThreadPoolExecutor) Executors.newFixedThreadPool(nbFiles, factory);
|
||||
List<Future<Void>> futures = new ArrayList<Future<Void>>(nbFiles);
|
||||
List<Future<Pair<Path,Path>>> futures = new ArrayList<Future<Pair<Path,Path>>> (nbFiles);
|
||||
|
||||
// Split each store file.
|
||||
for (Map.Entry<byte[], List<StoreFile>> entry: hstoreFilesToSplit.entrySet()) {
|
||||
|
@ -618,30 +647,38 @@ public class SplitTransaction {
|
|||
throw (InterruptedIOException)new InterruptedIOException().initCause(e);
|
||||
}
|
||||
|
||||
int created_a = 0;
|
||||
int created_b = 0;
|
||||
// Look for any exception
|
||||
for (Future<Void> future: futures) {
|
||||
for (Future<Pair<Path, Path>> future : futures) {
|
||||
try {
|
||||
future.get();
|
||||
Pair<Path, Path> p = future.get();
|
||||
created_a += p.getFirst() != null ? 1 : 0;
|
||||
created_b += p.getSecond() != null ? 1 : 0;
|
||||
} catch (InterruptedException e) {
|
||||
throw (InterruptedIOException)new InterruptedIOException().initCause(e);
|
||||
throw (InterruptedIOException) new InterruptedIOException().initCause(e);
|
||||
} catch (ExecutionException e) {
|
||||
throw new IOException(e);
|
||||
}
|
||||
}
|
||||
|
||||
return new Pair<Integer, Integer>(created_a, created_b);
|
||||
}
|
||||
|
||||
private void splitStoreFile(final byte[] family, final StoreFile sf) throws IOException {
|
||||
private Pair<Path, Path> splitStoreFile(final byte[] family, final StoreFile sf) throws IOException {
|
||||
HRegionFileSystem fs = this.parent.getRegionFileSystem();
|
||||
String familyName = Bytes.toString(family);
|
||||
fs.splitStoreFile(this.hri_a, familyName, sf, this.splitrow, false);
|
||||
fs.splitStoreFile(this.hri_b, familyName, sf, this.splitrow, true);
|
||||
|
||||
Path path_a = fs.splitStoreFile(this.hri_a, familyName, sf, this.splitrow, false);
|
||||
Path path_b = fs.splitStoreFile(this.hri_b, familyName, sf, this.splitrow, true);
|
||||
return new Pair<Path,Path>(path_a, path_b);
|
||||
}
|
||||
|
||||
/**
|
||||
* Utility class used to do the file splitting / reference writing
|
||||
* in parallel instead of sequentially.
|
||||
*/
|
||||
class StoreFileSplitter implements Callable<Void> {
|
||||
class StoreFileSplitter implements Callable<Pair<Path,Path>> {
|
||||
private final byte[] family;
|
||||
private final StoreFile sf;
|
||||
|
||||
|
@ -655,9 +692,8 @@ public class SplitTransaction {
|
|||
this.family = family;
|
||||
}
|
||||
|
||||
public Void call() throws IOException {
|
||||
splitStoreFile(family, sf);
|
||||
return null;
|
||||
public Pair<Path,Path> call() throws IOException {
|
||||
return splitStoreFile(family, sf);
|
||||
}
|
||||
}
|
||||
|
||||
|
|
|
@ -65,6 +65,7 @@ import org.apache.hadoop.hbase.exceptions.DeserializationException;
|
|||
import org.apache.hadoop.hbase.fs.HFileSystem;
|
||||
import org.apache.hadoop.hbase.master.HMaster;
|
||||
import org.apache.hadoop.hbase.master.RegionPlacementMaintainer;
|
||||
import org.apache.hadoop.hbase.regionserver.StoreFileInfo;
|
||||
import org.apache.hadoop.hbase.security.AccessDeniedException;
|
||||
import org.apache.hadoop.hbase.protobuf.ProtobufUtil;
|
||||
import org.apache.hadoop.hbase.protobuf.generated.FSProtos;
|
||||
|
@ -1409,13 +1410,20 @@ public abstract class FSUtils {
|
|||
return familyDirs;
|
||||
}
|
||||
|
||||
public static List<Path> getReferenceFilePaths(final FileSystem fs, final Path familyDir) throws IOException {
|
||||
FileStatus[] fds = fs.listStatus(familyDir, new ReferenceFileFilter(fs));
|
||||
List<Path> referenceFiles = new ArrayList<Path>(fds.length);
|
||||
for (FileStatus fdfs: fds) {
|
||||
Path fdPath = fdfs.getPath();
|
||||
referenceFiles.add(fdPath);
|
||||
}
|
||||
return referenceFiles;
|
||||
}
|
||||
|
||||
/**
|
||||
* Filter for HFiles that excludes reference files.
|
||||
*/
|
||||
public static class HFileFilter implements PathFilter {
|
||||
// This pattern will accept 0.90+ style hex hfies files but reject reference files
|
||||
final public static Pattern hfilePattern = Pattern.compile("^([0-9a-f]+)$");
|
||||
|
||||
final FileSystem fs;
|
||||
|
||||
public HFileFilter(FileSystem fs) {
|
||||
|
@ -1424,13 +1432,9 @@ public abstract class FSUtils {
|
|||
|
||||
@Override
|
||||
public boolean accept(Path rd) {
|
||||
if (!hfilePattern.matcher(rd.getName()).matches()) {
|
||||
return false;
|
||||
}
|
||||
|
||||
try {
|
||||
// only files
|
||||
return !fs.getFileStatus(rd).isDirectory();
|
||||
return !fs.getFileStatus(rd).isDirectory() && StoreFileInfo.isHFile(rd);
|
||||
} catch (IOException ioe) {
|
||||
// Maybe the file was moved or the fs was disconnected.
|
||||
LOG.warn("Skipping file " + rd +" due to IOException", ioe);
|
||||
|
@ -1439,6 +1443,28 @@ public abstract class FSUtils {
|
|||
}
|
||||
}
|
||||
|
||||
public static class ReferenceFileFilter implements PathFilter {
|
||||
|
||||
private final FileSystem fs;
|
||||
|
||||
public ReferenceFileFilter(FileSystem fs) {
|
||||
this.fs = fs;
|
||||
}
|
||||
|
||||
@Override
|
||||
public boolean accept(Path rd) {
|
||||
try {
|
||||
// only files can be references.
|
||||
return !fs.getFileStatus(rd).isDirectory() && StoreFileInfo.isReference(rd);
|
||||
} catch (IOException ioe) {
|
||||
// Maybe the file was moved or the fs was disconnected.
|
||||
LOG.warn("Skipping file " + rd +" due to IOException", ioe);
|
||||
return false;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
/**
|
||||
* @param conf
|
||||
* @return Returns the filesystem of the hbase rootdir.
|
||||
|
@ -1496,6 +1522,18 @@ public abstract class FSUtils {
|
|||
return map;
|
||||
}
|
||||
|
||||
public static int getRegionReferenceFileCount(final FileSystem fs, final Path p) {
|
||||
int result = 0;
|
||||
try {
|
||||
for (Path familyDir:getFamilyDirs(fs, p)){
|
||||
result += getReferenceFilePaths(fs, familyDir).size();
|
||||
}
|
||||
} catch (IOException e) {
|
||||
LOG.warn("Error Counting reference files.", e);
|
||||
}
|
||||
return result;
|
||||
}
|
||||
|
||||
|
||||
/**
|
||||
* Runs through the HBase rootdir and creates a reverse lookup map for
|
||||
|
|
|
@ -21,6 +21,12 @@ package org.apache.hadoop.hbase.regionserver;
|
|||
import static org.junit.Assert.assertEquals;
|
||||
import static org.junit.Assert.assertFalse;
|
||||
import static org.junit.Assert.assertTrue;
|
||||
import static org.mockito.Matchers.any;
|
||||
import static org.mockito.Matchers.anyInt;
|
||||
import static org.mockito.Matchers.eq;
|
||||
import static org.mockito.Mockito.doCallRealMethod;
|
||||
import static org.mockito.Mockito.doNothing;
|
||||
import static org.mockito.Mockito.doThrow;
|
||||
import static org.mockito.Mockito.spy;
|
||||
import static org.mockito.Mockito.when;
|
||||
|
||||
|
@ -250,6 +256,35 @@ public class TestSplitTransaction {
|
|||
assertTrue(!this.parent.lock.writeLock().isHeldByCurrentThread());
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testCountReferencesFailsSplit() throws IOException {
|
||||
final int rowcount = TEST_UTIL.loadRegion(this.parent, CF);
|
||||
assertTrue(rowcount > 0);
|
||||
int parentRowCount = countRows(this.parent);
|
||||
assertEquals(rowcount, parentRowCount);
|
||||
|
||||
// Start transaction.
|
||||
HRegion spiedRegion = spy(this.parent);
|
||||
SplitTransaction st = prepareGOOD_SPLIT_ROW(spiedRegion);
|
||||
SplitTransaction spiedUponSt = spy(st);
|
||||
doThrow(new IOException("Failing split. Expected reference file count isn't equal."))
|
||||
.when(spiedUponSt).assertReferenceFileCount(anyInt(),
|
||||
eq(new Path(this.parent.getRegionFileSystem().getTableDir(),
|
||||
st.getSecondDaughter().getEncodedName())));
|
||||
|
||||
// Run the execute. Look at what it returns.
|
||||
boolean expectedException = false;
|
||||
Server mockServer = Mockito.mock(Server.class);
|
||||
when(mockServer.getConfiguration()).thenReturn(TEST_UTIL.getConfiguration());
|
||||
try {
|
||||
spiedUponSt.execute(mockServer, null);
|
||||
} catch (IOException e) {
|
||||
expectedException = true;
|
||||
}
|
||||
assertTrue(expectedException);
|
||||
}
|
||||
|
||||
|
||||
@Test public void testRollback() throws IOException {
|
||||
final int rowcount = TEST_UTIL.loadRegion(this.parent, CF);
|
||||
assertTrue(rowcount > 0);
|
||||
|
@ -260,8 +295,10 @@ public class TestSplitTransaction {
|
|||
HRegion spiedRegion = spy(this.parent);
|
||||
SplitTransaction st = prepareGOOD_SPLIT_ROW(spiedRegion);
|
||||
SplitTransaction spiedUponSt = spy(st);
|
||||
when(spiedRegion.createDaughterRegionFromSplits(spiedUponSt.getSecondDaughter())).
|
||||
thenThrow(new MockedFailedDaughterCreation());
|
||||
doNothing().when(spiedUponSt).assertReferenceFileCount(anyInt(),
|
||||
eq(parent.getRegionFileSystem().getSplitsDir(st.getFirstDaughter())));
|
||||
when(spiedRegion.createDaughterRegionFromSplits(spiedUponSt.getSecondDaughter(), 1)).
|
||||
thenThrow(new MockedFailedDaughterCreation());
|
||||
// Run the execute. Look at what it returns.
|
||||
boolean expectedException = false;
|
||||
Server mockServer = Mockito.mock(Server.class);
|
||||
|
|
|
@ -959,7 +959,7 @@ public class TestStoreFile extends HBaseTestCase {
|
|||
if (null == path) {
|
||||
return null;
|
||||
}
|
||||
Path regionDir = regionFs.commitDaughterRegion(hri);
|
||||
Path regionDir = regionFs.commitDaughterRegion(hri, 1);
|
||||
return new Path(new Path(regionDir, family), path.getName());
|
||||
}
|
||||
|
||||
|
|
Loading…
Reference in New Issue