diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java index d0c3c1e211d..490f2b205a0 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java @@ -145,8 +145,10 @@ import org.apache.hadoop.hbase.util.HashedBytes; import org.apache.hadoop.hbase.util.Pair; import org.apache.hadoop.hbase.util.ServerRegionReplicaUtil; import org.apache.hadoop.hbase.util.Threads; +import org.apache.hadoop.hbase.zookeeper.ZKSplitLog; import org.apache.hadoop.io.MultipleIOException; import org.apache.hadoop.util.StringUtils; +import org.apache.zookeeper.KeeperException; import com.google.common.annotations.VisibleForTesting; import com.google.common.base.Preconditions; @@ -761,7 +763,8 @@ public class HRegion implements HeapSize { // , Writable{ // In distributedLogReplay mode, we don't know the last change sequence number because region // is opened before recovery completes. So we add a safety bumper to avoid new sequence number // overlaps used sequence numbers - nextSeqid += this.flushPerChanges + 10000000; // add another extra 10million + nextSeqid = HLogUtil.writeRegionOpenSequenceIdFile(this.fs.getFileSystem(), + this.fs.getRegionDir(), nextSeqid, (this.flushPerChanges + 10000000)); } LOG.info("Onlined " + this.getRegionInfo().getShortNameToLog() + diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/HLog.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/HLog.java index 1411455bb69..982501ab510 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/HLog.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/HLog.java @@ -72,6 +72,7 @@ public interface HLog { // TODO: Implementation detail. Why in here? Pattern EDITFILES_NAME_PATTERN = Pattern.compile("-?[0-9]+"); String RECOVERED_LOG_TMPFILE_SUFFIX = ".temp"; + String SEQUENCE_ID_FILE_SUFFIX = "_seqid"; /** * WAL Reader Interface diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/HLogUtil.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/HLogUtil.java index 555fbdcece7..27a01be9adf 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/HLogUtil.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/HLogUtil.java @@ -20,6 +20,8 @@ package org.apache.hadoop.hbase.regionserver.wal; import java.io.IOException; +import java.util.ArrayList; +import java.util.List; import java.util.NavigableSet; import java.util.TreeSet; import java.util.concurrent.atomic.AtomicLong; @@ -92,7 +94,7 @@ public class HLogUtil { public static Path getRegionDirRecoveredEditsDir(final Path regiondir) { return new Path(regiondir, HConstants.RECOVERED_EDITS_DIR); } - + /** * Move aside a bad edits file. * @@ -303,4 +305,63 @@ public class HLogUtil { } return trx; } + + /** + * Create a file with name as region open sequence id + * + * @param fs + * @param regiondir + * @param newSeqId + * @param saftyBumper + * @return long new sequence Id value + * @throws IOException + */ + public static long writeRegionOpenSequenceIdFile(final FileSystem fs, final Path regiondir, + long newSeqId, long saftyBumper) throws IOException { + + Path editsdir = HLogUtil.getRegionDirRecoveredEditsDir(regiondir); + long maxSeqId = 0; + FileStatus[] files = null; + if (fs.exists(editsdir)) { + files = FSUtils.listStatus(fs, editsdir, new PathFilter() { + @Override + public boolean accept(Path p) { + if (p.getName().endsWith(HLog.SEQUENCE_ID_FILE_SUFFIX)) { + return true; + } + return false; + } + }); + if (files != null) { + for (FileStatus status : files) { + String fileName = status.getPath().getName(); + try { + Long tmpSeqId = Long.parseLong(fileName.substring(0, fileName.length() + - HLog.SEQUENCE_ID_FILE_SUFFIX.length())); + maxSeqId = Math.max(tmpSeqId, maxSeqId); + } catch (NumberFormatException ex) { + LOG.warn("Invalid SeqId File Name=" + fileName); + } + } + } + } + if (maxSeqId > newSeqId) { + newSeqId = maxSeqId; + } + newSeqId += saftyBumper; // bump up SeqId + + // write a new seqId file + Path newSeqIdFile = new Path(editsdir, newSeqId + HLog.SEQUENCE_ID_FILE_SUFFIX); + if (!fs.createNewFile(newSeqIdFile)) { + throw new IOException("Failed to create SeqId file:" + newSeqIdFile); + } + // remove old ones + if(files != null) { + for (FileStatus status : files) { + fs.delete(status.getPath(), false); + } + } + return newSeqId; + } + } diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/master/TestDistributedLogSplitting.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/master/TestDistributedLogSplitting.java index fee569de736..56b96b384fd 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/master/TestDistributedLogSplitting.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/master/TestDistributedLogSplitting.java @@ -53,6 +53,7 @@ import org.apache.hadoop.fs.FSDataOutputStream; import org.apache.hadoop.fs.FileStatus; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; +import org.apache.hadoop.fs.PathFilter; import org.apache.hadoop.hbase.HBaseConfiguration; import org.apache.hadoop.hbase.HBaseTestingUtility; import org.apache.hadoop.hbase.HColumnDescriptor; @@ -1364,6 +1365,41 @@ public class TestDistributedLogSplitting { ht.close(); } + @Test(timeout = 300000) + public void testReadWriteSeqIdFiles() throws Exception { + LOG.info("testReadWriteSeqIdFiles"); + startCluster(2); + final ZooKeeperWatcher zkw = new ZooKeeperWatcher(conf, "table-creation", null); + HTable ht = installTable(zkw, "table", "family", 10); + FileSystem fs = master.getMasterFileSystem().getFileSystem(); + Path tableDir = FSUtils.getTableDir(FSUtils.getRootDir(conf), TableName.valueOf("table")); + List regionDirs = FSUtils.getRegionDirs(fs, tableDir); + HLogUtil.writeRegionOpenSequenceIdFile(fs, regionDirs.get(0) , 1L, 1000L); + // current SeqId file has seqid=1001 + HLogUtil.writeRegionOpenSequenceIdFile(fs, regionDirs.get(0) , 1L, 1000L); + // current SeqId file has seqid=2001 + assertEquals(3001, HLogUtil.writeRegionOpenSequenceIdFile(fs, regionDirs.get(0) , 3L, 1000L)); + + Path editsdir = HLogUtil.getRegionDirRecoveredEditsDir(regionDirs.get(0)); + FileStatus[] files = FSUtils.listStatus(fs, editsdir, new PathFilter() { + @Override + public boolean accept(Path p) { + if (p.getName().endsWith(HLog.SEQUENCE_ID_FILE_SUFFIX)) { + return true; + } + return false; + } + }); + // only one seqid file should exist + assertEquals(1, files.length); + + // verify all seqId files aren't treated as recovered.edits files + NavigableSet recoveredEdits = HLogUtil.getSplitEditFilesSorted(fs, regionDirs.get(0)); + assertEquals(0, recoveredEdits.size()); + + ht.close(); + } + HTable installTable(ZooKeeperWatcher zkw, String tname, String fname, int nrs) throws Exception { return installTable(zkw, tname, fname, nrs, 0); }