From 75cf81aae5f956f9e4243cf415da85dbd068c5f7 Mon Sep 17 00:00:00 2001 From: Jeffrey Zhong Date: Wed, 3 Dec 2014 18:12:18 -0800 Subject: [PATCH] HBASE-12485 Maintain SeqId monotonically increasing --- .../hadoop/hbase/regionserver/HRegion.java | 36 ++++++++++++---- .../org/apache/hadoop/hbase/util/FSUtils.java | 3 ++ .../apache/hadoop/hbase/wal/WALSplitter.java | 41 ++++++++++++++----- .../hbase/backup/TestHFileArchiving.java | 21 +++++++++- .../master/TestDistributedLogSplitting.java | 29 +++++++++---- .../handler/TestTableDeleteFamilyHandler.java | 23 ++++++++++- .../hbase/regionserver/wal/TestWALReplay.java | 13 +++++- .../apache/hadoop/hbase/wal/TestWALSplit.java | 11 ++++- 8 files changed, 144 insertions(+), 33 deletions(-) diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java index 54cea5aec5a..33aa8de7096 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java @@ -780,13 +780,16 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver { // this.lastFlushTime = EnvironmentEdgeManager.currentTime(); // Use maximum of wal sequenceid or that which was found in stores // (particularly if no recovered edits, seqid will be -1). - long nextSeqid = maxSeqId + 1; - if (this.isRecovering) { - // In distributedLogReplay mode, we don't know the last change sequence number because region - // is opened before recovery completes. So we add a safety bumper to avoid new sequence number - // overlaps used sequence numbers - nextSeqid = WALSplitter.writeRegionOpenSequenceIdFile(this.fs.getFileSystem(), - this.fs.getRegionDir(), nextSeqid, (this.flushPerChanges + 10000000)); + long nextSeqid = maxSeqId; + + // In distributedLogReplay mode, we don't know the last change sequence number because region + // is opened before recovery completes. So we add a safety bumper to avoid new sequence number + // overlaps used sequence numbers + if (this.writestate.writesEnabled) { + nextSeqid = WALSplitter.writeRegionSequenceIdFile(this.fs.getFileSystem(), this.fs + .getRegionDir(), nextSeqid, (this.isRecovering ? (this.flushPerChanges + 10000000) : 1)); + } else { + nextSeqid++; } LOG.info("Onlined " + this.getRegionInfo().getShortNameToLog() + @@ -914,6 +917,14 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver { // getRegionServerServices().getServerName(), storeFiles); WALUtil.writeRegionEventMarker(wal, getTableDesc(), getRegionInfo(), regionEventDesc, getSequenceId()); + + // Store SeqId in HDFS when a region closes + // checking region folder exists is due to many tests which delete the table folder while a + // table is still online + if (this.fs.getFileSystem().exists(this.fs.getRegionDir())) { + WALSplitter.writeRegionSequenceIdFile(this.fs.getFileSystem(), this.fs.getRegionDir(), + getSequenceId().get(), 0); + } } /** @@ -1295,7 +1306,7 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver { // } status.setStatus("Writing region close event to WAL"); - if (!abort && wal != null && getRegionServerServices() != null) { + if (!abort && wal != null && getRegionServerServices() != null && !writestate.readOnly) { writeRegionCloseMarker(wal); } @@ -3588,7 +3599,14 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver { // if (firstSeqIdInLog == -1) { firstSeqIdInLog = key.getLogSeqNum(); } - currentEditSeqId = key.getLogSeqNum(); + if (currentEditSeqId > key.getLogSeqNum()) { + // when this condition is true, it means we have a serious defect because we need to + // maintain increasing SeqId for WAL edits per region + LOG.error("Found decreasing SeqId. PreId=" + currentEditSeqId + " key=" + key + + "; edit=" + val); + } else { + currentEditSeqId = key.getLogSeqNum(); + } currentReplaySeqId = (key.getOrigLogSeqNum() > 0) ? key.getOrigLogSeqNum() : currentEditSeqId; diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/util/FSUtils.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/util/FSUtils.java index f7ebbc1575c..1def840cd17 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/util/FSUtils.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/util/FSUtils.java @@ -1505,6 +1505,9 @@ public abstract class FSUtils { FileStatus[] familyDirs = fs.listStatus(dd, familyFilter); for (FileStatus familyDir : familyDirs) { Path family = familyDir.getPath(); + if (family.getName().equals(HConstants.RECOVERED_EDITS_DIR)) { + continue; + } // now in family, iterate over the StoreFiles and // put in map FileStatus[] familyStatus = fs.listStatus(family); diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/WALSplitter.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/WALSplitter.java index 2d6b5894d86..d7d4a613428 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/WALSplitter.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/WALSplitter.java @@ -57,6 +57,7 @@ import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.apache.hadoop.hbase.classification.InterfaceAudience; import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FileAlreadyExistsException; import org.apache.hadoop.fs.FileStatus; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; @@ -130,6 +131,7 @@ import org.apache.hadoop.hbase.regionserver.wal.HLogKey; import org.apache.hadoop.hbase.regionserver.wal.WALEdit; import org.apache.hadoop.hbase.regionserver.wal.WALEditsReplaySink; import org.apache.hadoop.hbase.regionserver.wal.WALCellCodec; +import org.apache.hadoop.hbase.regionserver.wal.WALUtil; /** * This class is responsible for splitting up a bunch of regionserver commit log @@ -623,6 +625,10 @@ public class WALSplitter { if (p.getName().endsWith(RECOVERED_LOG_TMPFILE_SUFFIX)) { result = false; } + // Skip SeqId Files + if (isSequenceIdFile(p)) { + result = false; + } } catch (IOException e) { LOG.warn("Failed isFile check on " + p); } @@ -657,19 +663,21 @@ public class WALSplitter { return moveAsideName; } - private static final String SEQUENCE_ID_FILE_SUFFIX = "_seqid"; + private static final String SEQUENCE_ID_FILE_SUFFIX = ".seqid"; + private static final String OLD_SEQUENCE_ID_FILE_SUFFIX = "_seqid"; + private static final int SEQUENCE_ID_FILE_SUFFIX_LENGTH = SEQUENCE_ID_FILE_SUFFIX.length(); /** * Is the given file a region open sequence id file. */ @VisibleForTesting public static boolean isSequenceIdFile(final Path file) { - return file.getName().endsWith(SEQUENCE_ID_FILE_SUFFIX); + return file.getName().endsWith(SEQUENCE_ID_FILE_SUFFIX) + || file.getName().endsWith(OLD_SEQUENCE_ID_FILE_SUFFIX); } /** * Create a file with name as region open sequence id - * * @param fs * @param regiondir * @param newSeqId @@ -677,10 +685,10 @@ public class WALSplitter { * @return long new sequence Id value * @throws IOException */ - public static long writeRegionOpenSequenceIdFile(final FileSystem fs, final Path regiondir, + public static long writeRegionSequenceIdFile(final FileSystem fs, final Path regiondir, long newSeqId, long saftyBumper) throws IOException { - Path editsdir = getRegionDirRecoveredEditsDir(regiondir); + Path editsdir = WALSplitter.getRegionDirRecoveredEditsDir(regiondir); long maxSeqId = 0; FileStatus[] files = null; if (fs.exists(editsdir)) { @@ -695,7 +703,7 @@ public class WALSplitter { String fileName = status.getPath().getName(); try { Long tmpSeqId = Long.parseLong(fileName.substring(0, fileName.length() - - SEQUENCE_ID_FILE_SUFFIX.length())); + - SEQUENCE_ID_FILE_SUFFIX_LENGTH)); maxSeqId = Math.max(tmpSeqId, maxSeqId); } catch (NumberFormatException ex) { LOG.warn("Invalid SeqId File Name=" + fileName); @@ -707,15 +715,28 @@ public class WALSplitter { newSeqId = maxSeqId; } newSeqId += saftyBumper; // bump up SeqId - + // write a new seqId file Path newSeqIdFile = new Path(editsdir, newSeqId + SEQUENCE_ID_FILE_SUFFIX); - if (!fs.createNewFile(newSeqIdFile)) { - throw new IOException("Failed to create SeqId file:" + newSeqIdFile); + if (newSeqId != maxSeqId) { + try { + if (!fs.createNewFile(newSeqIdFile) && !fs.exists(newSeqIdFile)) { + throw new IOException("Failed to create SeqId file:" + newSeqIdFile); + } + if (LOG.isDebugEnabled()) { + LOG.debug("Written region seqId to file:" + newSeqIdFile + " ,newSeqId=" + newSeqId + + " ,maxSeqId=" + maxSeqId); + } + } catch (FileAlreadyExistsException ignored) { + // latest hdfs throws this exception. it's all right if newSeqIdFile already exists + } } // remove old ones - if(files != null) { + if (files != null) { for (FileStatus status : files) { + if (newSeqIdFile.equals(status.getPath())) { + continue; + } fs.delete(status.getPath(), false); } } diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/backup/TestHFileArchiving.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/backup/TestHFileArchiving.java index a7fe72766a0..ae78a3ebe9e 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/backup/TestHFileArchiving.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/backup/TestHFileArchiving.java @@ -144,7 +144,16 @@ public class TestHFileArchiving { assertTrue(fs.exists(archiveDir)); // check to make sure the store directory was copied - FileStatus[] stores = fs.listStatus(archiveDir); + // check to make sure the store directory was copied + FileStatus[] stores = fs.listStatus(archiveDir, new PathFilter() { + @Override + public boolean accept(Path p) { + if (p.getName().contains(HConstants.RECOVERED_EDITS_DIR)) { + return false; + } + return true; + } + }); assertTrue(stores.length == 1); // make sure we archived the store files @@ -412,7 +421,15 @@ public class TestHFileArchiving { * @throws IOException */ private List getAllFileNames(final FileSystem fs, Path archiveDir) throws IOException { - FileStatus[] files = FSUtils.listStatus(fs, archiveDir, null); + FileStatus[] files = FSUtils.listStatus(fs, archiveDir, new PathFilter() { + @Override + public boolean accept(Path p) { + if (p.getName().contains(HConstants.RECOVERED_EDITS_DIR)) { + return false; + } + return true; + } + }); return recurseOnFiles(fs, files, new ArrayList()); } diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/master/TestDistributedLogSplitting.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/master/TestDistributedLogSplitting.java index 378923fbd9f..8daafe45730 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/master/TestDistributedLogSplitting.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/master/TestDistributedLogSplitting.java @@ -260,7 +260,15 @@ public class TestDistributedLogSplitting { Path editsdir = WALSplitter.getRegionDirRecoveredEditsDir(HRegion.getRegionDir(tdir, hri.getEncodedName())); LOG.debug("checking edits dir " + editsdir); - FileStatus[] files = fs.listStatus(editsdir); + FileStatus[] files = fs.listStatus(editsdir, new PathFilter() { + @Override + public boolean accept(Path p) { + if (WALSplitter.isSequenceIdFile(p)) { + return false; + } + return true; + } + }); assertTrue("edits dir should have more than a single file in it. instead has " + files.length, files.length > 1); for (int i = 0; i < files.length; i++) { @@ -841,7 +849,15 @@ public class TestDistributedLogSplitting { WALSplitter.getRegionDirRecoveredEditsDir(HRegion.getRegionDir(tdir, hri.getEncodedName())); LOG.debug("checking edits dir " + editsdir); if(!fs.exists(editsdir)) continue; - FileStatus[] files = fs.listStatus(editsdir); + FileStatus[] files = fs.listStatus(editsdir, new PathFilter() { + @Override + public boolean accept(Path p) { + if (WALSplitter.isSequenceIdFile(p)) { + return false; + } + return true; + } + }); if(files != null) { for(FileStatus file : files) { int c = countWAL(file.getPath(), fs, conf); @@ -1385,11 +1401,10 @@ public class TestDistributedLogSplitting { FileSystem fs = master.getMasterFileSystem().getFileSystem(); Path tableDir = FSUtils.getTableDir(FSUtils.getRootDir(conf), TableName.valueOf("table")); List regionDirs = FSUtils.getRegionDirs(fs, tableDir); - WALSplitter.writeRegionOpenSequenceIdFile(fs, regionDirs.get(0) , 1L, 1000L); - // current SeqId file has seqid=1001 - WALSplitter.writeRegionOpenSequenceIdFile(fs, regionDirs.get(0) , 1L, 1000L); - // current SeqId file has seqid=2001 - assertEquals(3001, WALSplitter.writeRegionOpenSequenceIdFile(fs, regionDirs.get(0), 3L, 1000L)); + long newSeqId = WALSplitter.writeRegionSequenceIdFile(fs, regionDirs.get(0), 1L, 1000L); + WALSplitter.writeRegionSequenceIdFile(fs, regionDirs.get(0) , 1L, 1000L); + assertEquals(newSeqId + 2000, + WALSplitter.writeRegionSequenceIdFile(fs, regionDirs.get(0), 3L, 1000L)); Path editsdir = WALSplitter.getRegionDirRecoveredEditsDir(regionDirs.get(0)); FileStatus[] files = FSUtils.listStatus(fs, editsdir, new PathFilter() { diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/master/handler/TestTableDeleteFamilyHandler.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/master/handler/TestTableDeleteFamilyHandler.java index e6700df67d3..463fc54575f 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/master/handler/TestTableDeleteFamilyHandler.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/master/handler/TestTableDeleteFamilyHandler.java @@ -28,6 +28,8 @@ import java.io.IOException; import org.apache.hadoop.fs.FileStatus; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; +import org.apache.hadoop.fs.PathFilter; +import org.apache.hadoop.hbase.HConstants; import org.apache.hadoop.hbase.TableName; import org.apache.hadoop.hbase.HBaseTestingUtility; import org.apache.hadoop.hbase.HColumnDescriptor; @@ -37,6 +39,7 @@ import org.apache.hadoop.hbase.client.Admin; import org.apache.hadoop.hbase.client.HTable; import org.apache.hadoop.hbase.util.Bytes; import org.apache.hadoop.hbase.util.FSUtils; +import org.apache.hadoop.hbase.wal.WALSplitter; import org.junit.AfterClass; import org.junit.Before; import org.junit.BeforeClass; @@ -121,7 +124,15 @@ public class TestTableDeleteFamilyHandler { FileStatus[] fileStatus = fs.listStatus(tableDir); for (int i = 0; i < fileStatus.length; i++) { if (fileStatus[i].isDirectory() == true) { - FileStatus[] cf = fs.listStatus(fileStatus[i].getPath()); + FileStatus[] cf = fs.listStatus(fileStatus[i].getPath(), new PathFilter() { + @Override + public boolean accept(Path p) { + if (p.getName().contains(HConstants.RECOVERED_EDITS_DIR)) { + return false; + } + return true; + } + }); int k = 1; for (int j = 0; j < cf.length; j++) { if (cf[j].isDirectory() == true @@ -148,7 +159,15 @@ public class TestTableDeleteFamilyHandler { fileStatus = fs.listStatus(tableDir); for (int i = 0; i < fileStatus.length; i++) { if (fileStatus[i].isDirectory() == true) { - FileStatus[] cf = fs.listStatus(fileStatus[i].getPath()); + FileStatus[] cf = fs.listStatus(fileStatus[i].getPath(), new PathFilter() { + @Override + public boolean accept(Path p) { + if (WALSplitter.isSequenceIdFile(p)) { + return false; + } + return true; + } + }); for (int j = 0; j < cf.length; j++) { if (cf[j].isDirectory() == true) { assertFalse(cf[j].getPath().getName().equals("cf2")); diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestWALReplay.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestWALReplay.java index 1251caa765b..3f551e4d389 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestWALReplay.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestWALReplay.java @@ -38,6 +38,7 @@ import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FileStatus; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; +import org.apache.hadoop.fs.PathFilter; import org.apache.hadoop.hbase.Cell; import org.apache.hadoop.hbase.HBaseConfiguration; import org.apache.hadoop.hbase.HBaseTestingUtility; @@ -899,8 +900,16 @@ public class TestWALReplay { WALSplitter.splitLogFile(hbaseRootDir, listStatus[0], this.fs, this.conf, null, null, null, mode, wals); FileStatus[] listStatus1 = this.fs.listStatus( - new Path(FSUtils.getTableDir(hbaseRootDir, tableName), - new Path(hri.getEncodedName(), "recovered.edits"))); + new Path(FSUtils.getTableDir(hbaseRootDir, tableName), new Path(hri.getEncodedName(), + "recovered.edits")), new PathFilter() { + @Override + public boolean accept(Path p) { + if (WALSplitter.isSequenceIdFile(p)) { + return false; + } + return true; + } + }); int editCount = 0; for (FileStatus fileStatus : listStatus1) { editCount = Integer.parseInt(fileStatus.getPath().getName()); diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/wal/TestWALSplit.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/wal/TestWALSplit.java index 13584737b25..c52a534933e 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/wal/TestWALSplit.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/wal/TestWALSplit.java @@ -48,6 +48,7 @@ import org.apache.hadoop.fs.FileStatus; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.FileUtil; import org.apache.hadoop.fs.Path; +import org.apache.hadoop.fs.PathFilter; import org.apache.hadoop.hbase.Cell; import org.apache.hadoop.hbase.HBaseConfiguration; import org.apache.hadoop.hbase.HBaseTestingUtility; @@ -1157,7 +1158,15 @@ public class TestWALSplit { @SuppressWarnings("deprecation") Path editsdir = WALSplitter.getRegionDirRecoveredEditsDir(HRegion.getRegionDir(tdir, Bytes.toString(region.getBytes()))); - FileStatus [] files = this.fs.listStatus(editsdir); + FileStatus[] files = fs.listStatus(editsdir, new PathFilter() { + @Override + public boolean accept(Path p) { + if (WALSplitter.isSequenceIdFile(p)) { + return false; + } + return true; + } + }); Path[] paths = new Path[files.length]; for (int i = 0; i < files.length; i++) { paths[i] = files[i].getPath();