HBASE-12485 Maintain SeqId monotonically increasing

This commit is contained in:
Jeffrey Zhong 2014-12-03 18:12:18 -08:00
parent 84b41f8029
commit f67162547d
8 changed files with 144 additions and 33 deletions

View File

@ -778,13 +778,16 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver { //
this.lastFlushTime = EnvironmentEdgeManager.currentTime(); this.lastFlushTime = EnvironmentEdgeManager.currentTime();
// Use maximum of wal sequenceid or that which was found in stores // Use maximum of wal sequenceid or that which was found in stores
// (particularly if no recovered edits, seqid will be -1). // (particularly if no recovered edits, seqid will be -1).
long nextSeqid = maxSeqId + 1; long nextSeqid = maxSeqId;
if (this.isRecovering) {
// In distributedLogReplay mode, we don't know the last change sequence number because region // In distributedLogReplay mode, we don't know the last change sequence number because region
// is opened before recovery completes. So we add a safety bumper to avoid new sequence number // is opened before recovery completes. So we add a safety bumper to avoid new sequence number
// overlaps used sequence numbers // overlaps used sequence numbers
nextSeqid = WALSplitter.writeRegionOpenSequenceIdFile(this.fs.getFileSystem(), if (this.writestate.writesEnabled) {
this.fs.getRegionDir(), nextSeqid, (this.flushPerChanges + 10000000)); nextSeqid = WALSplitter.writeRegionSequenceIdFile(this.fs.getFileSystem(), this.fs
.getRegionDir(), nextSeqid, (this.isRecovering ? (this.flushPerChanges + 10000000) : 1));
} else {
nextSeqid++;
} }
LOG.info("Onlined " + this.getRegionInfo().getShortNameToLog() + LOG.info("Onlined " + this.getRegionInfo().getShortNameToLog() +
@ -912,6 +915,14 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver { //
getRegionServerServices().getServerName(), storeFiles); getRegionServerServices().getServerName(), storeFiles);
WALUtil.writeRegionEventMarker(wal, getTableDesc(), getRegionInfo(), regionEventDesc, WALUtil.writeRegionEventMarker(wal, getTableDesc(), getRegionInfo(), regionEventDesc,
getSequenceId()); getSequenceId());
// Store SeqId in HDFS when a region closes
// checking region folder exists is due to many tests which delete the table folder while a
// table is still online
if (this.fs.getFileSystem().exists(this.fs.getRegionDir())) {
WALSplitter.writeRegionSequenceIdFile(this.fs.getFileSystem(), this.fs.getRegionDir(),
getSequenceId().get(), 0);
}
} }
/** /**
@ -1293,7 +1304,7 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver { //
} }
status.setStatus("Writing region close event to WAL"); status.setStatus("Writing region close event to WAL");
if (!abort && wal != null && getRegionServerServices() != null) { if (!abort && wal != null && getRegionServerServices() != null && !writestate.readOnly) {
writeRegionCloseMarker(wal); writeRegionCloseMarker(wal);
} }
@ -3593,7 +3604,14 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver { //
if (firstSeqIdInLog == -1) { if (firstSeqIdInLog == -1) {
firstSeqIdInLog = key.getLogSeqNum(); firstSeqIdInLog = key.getLogSeqNum();
} }
currentEditSeqId = key.getLogSeqNum(); if (currentEditSeqId > key.getLogSeqNum()) {
// when this condition is true, it means we have a serious defect because we need to
// maintain increasing SeqId for WAL edits per region
LOG.error("Found decreasing SeqId. PreId=" + currentEditSeqId + " key=" + key
+ "; edit=" + val);
} else {
currentEditSeqId = key.getLogSeqNum();
}
currentReplaySeqId = (key.getOrigLogSeqNum() > 0) ? currentReplaySeqId = (key.getOrigLogSeqNum() > 0) ?
key.getOrigLogSeqNum() : currentEditSeqId; key.getOrigLogSeqNum() : currentEditSeqId;

View File

@ -1508,6 +1508,9 @@ public abstract class FSUtils {
FileStatus[] familyDirs = fs.listStatus(dd, familyFilter); FileStatus[] familyDirs = fs.listStatus(dd, familyFilter);
for (FileStatus familyDir : familyDirs) { for (FileStatus familyDir : familyDirs) {
Path family = familyDir.getPath(); Path family = familyDir.getPath();
if (family.getName().equals(HConstants.RECOVERED_EDITS_DIR)) {
continue;
}
// now in family, iterate over the StoreFiles and // now in family, iterate over the StoreFiles and
// put in map // put in map
FileStatus[] familyStatus = fs.listStatus(family); FileStatus[] familyStatus = fs.listStatus(family);

View File

@ -57,6 +57,7 @@ import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory; import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.hbase.classification.InterfaceAudience; import org.apache.hadoop.hbase.classification.InterfaceAudience;
import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileAlreadyExistsException;
import org.apache.hadoop.fs.FileStatus; import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path; import org.apache.hadoop.fs.Path;
@ -124,6 +125,7 @@ import org.apache.hadoop.hbase.regionserver.wal.HLogKey;
import org.apache.hadoop.hbase.regionserver.wal.WALEdit; import org.apache.hadoop.hbase.regionserver.wal.WALEdit;
import org.apache.hadoop.hbase.regionserver.wal.WALEditsReplaySink; import org.apache.hadoop.hbase.regionserver.wal.WALEditsReplaySink;
import org.apache.hadoop.hbase.regionserver.wal.WALCellCodec; import org.apache.hadoop.hbase.regionserver.wal.WALCellCodec;
import org.apache.hadoop.hbase.regionserver.wal.WALUtil;
/** /**
* This class is responsible for splitting up a bunch of regionserver commit log * This class is responsible for splitting up a bunch of regionserver commit log
@ -613,6 +615,10 @@ public class WALSplitter {
if (p.getName().endsWith(RECOVERED_LOG_TMPFILE_SUFFIX)) { if (p.getName().endsWith(RECOVERED_LOG_TMPFILE_SUFFIX)) {
result = false; result = false;
} }
// Skip SeqId Files
if (isSequenceIdFile(p)) {
result = false;
}
} catch (IOException e) { } catch (IOException e) {
LOG.warn("Failed isFile check on " + p); LOG.warn("Failed isFile check on " + p);
} }
@ -647,19 +653,21 @@ public class WALSplitter {
return moveAsideName; return moveAsideName;
} }
private static final String SEQUENCE_ID_FILE_SUFFIX = "_seqid"; private static final String SEQUENCE_ID_FILE_SUFFIX = ".seqid";
private static final String OLD_SEQUENCE_ID_FILE_SUFFIX = "_seqid";
private static final int SEQUENCE_ID_FILE_SUFFIX_LENGTH = SEQUENCE_ID_FILE_SUFFIX.length();
/** /**
* Is the given file a region open sequence id file. * Is the given file a region open sequence id file.
*/ */
@VisibleForTesting @VisibleForTesting
public static boolean isSequenceIdFile(final Path file) { public static boolean isSequenceIdFile(final Path file) {
return file.getName().endsWith(SEQUENCE_ID_FILE_SUFFIX); return file.getName().endsWith(SEQUENCE_ID_FILE_SUFFIX)
|| file.getName().endsWith(OLD_SEQUENCE_ID_FILE_SUFFIX);
} }
/** /**
* Create a file with name as region open sequence id * Create a file with name as region open sequence id
*
* @param fs * @param fs
* @param regiondir * @param regiondir
* @param newSeqId * @param newSeqId
@ -667,10 +675,10 @@ public class WALSplitter {
* @return long new sequence Id value * @return long new sequence Id value
* @throws IOException * @throws IOException
*/ */
public static long writeRegionOpenSequenceIdFile(final FileSystem fs, final Path regiondir, public static long writeRegionSequenceIdFile(final FileSystem fs, final Path regiondir,
long newSeqId, long saftyBumper) throws IOException { long newSeqId, long saftyBumper) throws IOException {
Path editsdir = getRegionDirRecoveredEditsDir(regiondir); Path editsdir = WALSplitter.getRegionDirRecoveredEditsDir(regiondir);
long maxSeqId = 0; long maxSeqId = 0;
FileStatus[] files = null; FileStatus[] files = null;
if (fs.exists(editsdir)) { if (fs.exists(editsdir)) {
@ -685,7 +693,7 @@ public class WALSplitter {
String fileName = status.getPath().getName(); String fileName = status.getPath().getName();
try { try {
Long tmpSeqId = Long.parseLong(fileName.substring(0, fileName.length() Long tmpSeqId = Long.parseLong(fileName.substring(0, fileName.length()
- SEQUENCE_ID_FILE_SUFFIX.length())); - SEQUENCE_ID_FILE_SUFFIX_LENGTH));
maxSeqId = Math.max(tmpSeqId, maxSeqId); maxSeqId = Math.max(tmpSeqId, maxSeqId);
} catch (NumberFormatException ex) { } catch (NumberFormatException ex) {
LOG.warn("Invalid SeqId File Name=" + fileName); LOG.warn("Invalid SeqId File Name=" + fileName);
@ -697,15 +705,28 @@ public class WALSplitter {
newSeqId = maxSeqId; newSeqId = maxSeqId;
} }
newSeqId += saftyBumper; // bump up SeqId newSeqId += saftyBumper; // bump up SeqId
// write a new seqId file // write a new seqId file
Path newSeqIdFile = new Path(editsdir, newSeqId + SEQUENCE_ID_FILE_SUFFIX); Path newSeqIdFile = new Path(editsdir, newSeqId + SEQUENCE_ID_FILE_SUFFIX);
if (!fs.createNewFile(newSeqIdFile)) { if (newSeqId != maxSeqId) {
throw new IOException("Failed to create SeqId file:" + newSeqIdFile); try {
if (!fs.createNewFile(newSeqIdFile) && !fs.exists(newSeqIdFile)) {
throw new IOException("Failed to create SeqId file:" + newSeqIdFile);
}
if (LOG.isDebugEnabled()) {
LOG.debug("Written region seqId to file:" + newSeqIdFile + " ,newSeqId=" + newSeqId
+ " ,maxSeqId=" + maxSeqId);
}
} catch (FileAlreadyExistsException ignored) {
// latest hdfs throws this exception. it's all right if newSeqIdFile already exists
}
} }
// remove old ones // remove old ones
if(files != null) { if (files != null) {
for (FileStatus status : files) { for (FileStatus status : files) {
if (newSeqIdFile.equals(status.getPath())) {
continue;
}
fs.delete(status.getPath(), false); fs.delete(status.getPath(), false);
} }
} }

View File

@ -145,7 +145,16 @@ public class TestHFileArchiving {
assertTrue(fs.exists(archiveDir)); assertTrue(fs.exists(archiveDir));
// check to make sure the store directory was copied // check to make sure the store directory was copied
FileStatus[] stores = fs.listStatus(archiveDir); // check to make sure the store directory was copied
FileStatus[] stores = fs.listStatus(archiveDir, new PathFilter() {
@Override
public boolean accept(Path p) {
if (p.getName().contains(HConstants.RECOVERED_EDITS_DIR)) {
return false;
}
return true;
}
});
assertTrue(stores.length == 1); assertTrue(stores.length == 1);
// make sure we archived the store files // make sure we archived the store files
@ -413,7 +422,15 @@ public class TestHFileArchiving {
* @throws IOException * @throws IOException
*/ */
private List<String> getAllFileNames(final FileSystem fs, Path archiveDir) throws IOException { private List<String> getAllFileNames(final FileSystem fs, Path archiveDir) throws IOException {
FileStatus[] files = FSUtils.listStatus(fs, archiveDir, null); FileStatus[] files = FSUtils.listStatus(fs, archiveDir, new PathFilter() {
@Override
public boolean accept(Path p) {
if (p.getName().contains(HConstants.RECOVERED_EDITS_DIR)) {
return false;
}
return true;
}
});
return recurseOnFiles(fs, files, new ArrayList<String>()); return recurseOnFiles(fs, files, new ArrayList<String>());
} }

View File

@ -261,7 +261,15 @@ public class TestDistributedLogSplitting {
Path editsdir = Path editsdir =
WALSplitter.getRegionDirRecoveredEditsDir(HRegion.getRegionDir(tdir, hri.getEncodedName())); WALSplitter.getRegionDirRecoveredEditsDir(HRegion.getRegionDir(tdir, hri.getEncodedName()));
LOG.debug("checking edits dir " + editsdir); LOG.debug("checking edits dir " + editsdir);
FileStatus[] files = fs.listStatus(editsdir); FileStatus[] files = fs.listStatus(editsdir, new PathFilter() {
@Override
public boolean accept(Path p) {
if (WALSplitter.isSequenceIdFile(p)) {
return false;
}
return true;
}
});
assertTrue("edits dir should have more than a single file in it. instead has " + files.length, assertTrue("edits dir should have more than a single file in it. instead has " + files.length,
files.length > 1); files.length > 1);
for (int i = 0; i < files.length; i++) { for (int i = 0; i < files.length; i++) {
@ -842,7 +850,15 @@ public class TestDistributedLogSplitting {
WALSplitter.getRegionDirRecoveredEditsDir(HRegion.getRegionDir(tdir, hri.getEncodedName())); WALSplitter.getRegionDirRecoveredEditsDir(HRegion.getRegionDir(tdir, hri.getEncodedName()));
LOG.debug("checking edits dir " + editsdir); LOG.debug("checking edits dir " + editsdir);
if(!fs.exists(editsdir)) continue; if(!fs.exists(editsdir)) continue;
FileStatus[] files = fs.listStatus(editsdir); FileStatus[] files = fs.listStatus(editsdir, new PathFilter() {
@Override
public boolean accept(Path p) {
if (WALSplitter.isSequenceIdFile(p)) {
return false;
}
return true;
}
});
if(files != null) { if(files != null) {
for(FileStatus file : files) { for(FileStatus file : files) {
int c = countWAL(file.getPath(), fs, conf); int c = countWAL(file.getPath(), fs, conf);
@ -1385,11 +1401,10 @@ public class TestDistributedLogSplitting {
FileSystem fs = master.getMasterFileSystem().getFileSystem(); FileSystem fs = master.getMasterFileSystem().getFileSystem();
Path tableDir = FSUtils.getTableDir(FSUtils.getRootDir(conf), TableName.valueOf("table")); Path tableDir = FSUtils.getTableDir(FSUtils.getRootDir(conf), TableName.valueOf("table"));
List<Path> regionDirs = FSUtils.getRegionDirs(fs, tableDir); List<Path> regionDirs = FSUtils.getRegionDirs(fs, tableDir);
WALSplitter.writeRegionOpenSequenceIdFile(fs, regionDirs.get(0) , 1L, 1000L); long newSeqId = WALSplitter.writeRegionSequenceIdFile(fs, regionDirs.get(0), 1L, 1000L);
// current SeqId file has seqid=1001 WALSplitter.writeRegionSequenceIdFile(fs, regionDirs.get(0) , 1L, 1000L);
WALSplitter.writeRegionOpenSequenceIdFile(fs, regionDirs.get(0) , 1L, 1000L); assertEquals(newSeqId + 2000,
// current SeqId file has seqid=2001 WALSplitter.writeRegionSequenceIdFile(fs, regionDirs.get(0), 3L, 1000L));
assertEquals(3001, WALSplitter.writeRegionOpenSequenceIdFile(fs, regionDirs.get(0), 3L, 1000L));
Path editsdir = WALSplitter.getRegionDirRecoveredEditsDir(regionDirs.get(0)); Path editsdir = WALSplitter.getRegionDirRecoveredEditsDir(regionDirs.get(0));
FileStatus[] files = FSUtils.listStatus(fs, editsdir, new PathFilter() { FileStatus[] files = FSUtils.listStatus(fs, editsdir, new PathFilter() {

View File

@ -28,6 +28,8 @@ import java.io.IOException;
import org.apache.hadoop.fs.FileStatus; import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path; import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.PathFilter;
import org.apache.hadoop.hbase.HConstants;
import org.apache.hadoop.hbase.TableName; import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.HBaseTestingUtility; import org.apache.hadoop.hbase.HBaseTestingUtility;
import org.apache.hadoop.hbase.HColumnDescriptor; import org.apache.hadoop.hbase.HColumnDescriptor;
@ -38,6 +40,7 @@ import org.apache.hadoop.hbase.testclassification.LargeTests;
import org.apache.hadoop.hbase.testclassification.MasterTests; import org.apache.hadoop.hbase.testclassification.MasterTests;
import org.apache.hadoop.hbase.util.Bytes; import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.hbase.util.FSUtils; import org.apache.hadoop.hbase.util.FSUtils;
import org.apache.hadoop.hbase.wal.WALSplitter;
import org.junit.AfterClass; import org.junit.AfterClass;
import org.junit.Before; import org.junit.Before;
import org.junit.BeforeClass; import org.junit.BeforeClass;
@ -122,7 +125,15 @@ public class TestTableDeleteFamilyHandler {
FileStatus[] fileStatus = fs.listStatus(tableDir); FileStatus[] fileStatus = fs.listStatus(tableDir);
for (int i = 0; i < fileStatus.length; i++) { for (int i = 0; i < fileStatus.length; i++) {
if (fileStatus[i].isDirectory() == true) { if (fileStatus[i].isDirectory() == true) {
FileStatus[] cf = fs.listStatus(fileStatus[i].getPath()); FileStatus[] cf = fs.listStatus(fileStatus[i].getPath(), new PathFilter() {
@Override
public boolean accept(Path p) {
if (p.getName().contains(HConstants.RECOVERED_EDITS_DIR)) {
return false;
}
return true;
}
});
int k = 1; int k = 1;
for (int j = 0; j < cf.length; j++) { for (int j = 0; j < cf.length; j++) {
if (cf[j].isDirectory() == true if (cf[j].isDirectory() == true
@ -149,7 +160,15 @@ public class TestTableDeleteFamilyHandler {
fileStatus = fs.listStatus(tableDir); fileStatus = fs.listStatus(tableDir);
for (int i = 0; i < fileStatus.length; i++) { for (int i = 0; i < fileStatus.length; i++) {
if (fileStatus[i].isDirectory() == true) { if (fileStatus[i].isDirectory() == true) {
FileStatus[] cf = fs.listStatus(fileStatus[i].getPath()); FileStatus[] cf = fs.listStatus(fileStatus[i].getPath(), new PathFilter() {
@Override
public boolean accept(Path p) {
if (WALSplitter.isSequenceIdFile(p)) {
return false;
}
return true;
}
});
for (int j = 0; j < cf.length; j++) { for (int j = 0; j < cf.length; j++) {
if (cf[j].isDirectory() == true) { if (cf[j].isDirectory() == true) {
assertFalse(cf[j].getPath().getName().equals("cf2")); assertFalse(cf[j].getPath().getName().equals("cf2"));

View File

@ -38,6 +38,7 @@ import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileStatus; import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path; import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.PathFilter;
import org.apache.hadoop.hbase.Cell; import org.apache.hadoop.hbase.Cell;
import org.apache.hadoop.hbase.HBaseConfiguration; import org.apache.hadoop.hbase.HBaseConfiguration;
import org.apache.hadoop.hbase.HBaseTestingUtility; import org.apache.hadoop.hbase.HBaseTestingUtility;
@ -900,8 +901,16 @@ public class TestWALReplay {
WALSplitter.splitLogFile(hbaseRootDir, listStatus[0], WALSplitter.splitLogFile(hbaseRootDir, listStatus[0],
this.fs, this.conf, null, null, null, mode, wals); this.fs, this.conf, null, null, null, mode, wals);
FileStatus[] listStatus1 = this.fs.listStatus( FileStatus[] listStatus1 = this.fs.listStatus(
new Path(FSUtils.getTableDir(hbaseRootDir, tableName), new Path(FSUtils.getTableDir(hbaseRootDir, tableName), new Path(hri.getEncodedName(),
new Path(hri.getEncodedName(), "recovered.edits"))); "recovered.edits")), new PathFilter() {
@Override
public boolean accept(Path p) {
if (WALSplitter.isSequenceIdFile(p)) {
return false;
}
return true;
}
});
int editCount = 0; int editCount = 0;
for (FileStatus fileStatus : listStatus1) { for (FileStatus fileStatus : listStatus1) {
editCount = Integer.parseInt(fileStatus.getPath().getName()); editCount = Integer.parseInt(fileStatus.getPath().getName());

View File

@ -55,6 +55,7 @@ import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.FileUtil; import org.apache.hadoop.fs.FileUtil;
import org.apache.hadoop.fs.Path; import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.PathFilter;
import org.apache.hadoop.hbase.Cell; import org.apache.hadoop.hbase.Cell;
import org.apache.hadoop.hbase.HBaseConfiguration; import org.apache.hadoop.hbase.HBaseConfiguration;
import org.apache.hadoop.hbase.HBaseTestingUtility; import org.apache.hadoop.hbase.HBaseTestingUtility;
@ -1163,7 +1164,15 @@ public class TestWALSplit {
@SuppressWarnings("deprecation") @SuppressWarnings("deprecation")
Path editsdir = WALSplitter.getRegionDirRecoveredEditsDir(HRegion.getRegionDir(tdir, Path editsdir = WALSplitter.getRegionDirRecoveredEditsDir(HRegion.getRegionDir(tdir,
Bytes.toString(region.getBytes()))); Bytes.toString(region.getBytes())));
FileStatus [] files = this.fs.listStatus(editsdir); FileStatus[] files = fs.listStatus(editsdir, new PathFilter() {
@Override
public boolean accept(Path p) {
if (WALSplitter.isSequenceIdFile(p)) {
return false;
}
return true;
}
});
Path[] paths = new Path[files.length]; Path[] paths = new Path[files.length];
for (int i = 0; i < files.length; i++) { for (int i = 0; i < files.length; i++) {
paths[i] = files[i].getPath(); paths[i] = files[i].getPath();