diff --git a/hbase-common/src/main/java/org/apache/hadoop/hbase/util/CommonFSUtils.java b/hbase-common/src/main/java/org/apache/hadoop/hbase/util/CommonFSUtils.java index 1225d194ecf..e39a25ab84c 100644 --- a/hbase-common/src/main/java/org/apache/hadoop/hbase/util/CommonFSUtils.java +++ b/hbase-common/src/main/java/org/apache/hadoop/hbase/util/CommonFSUtils.java @@ -378,6 +378,24 @@ public abstract class CommonFSUtils { tableName.getQualifierAsString()); } + /** + * Returns the {@link org.apache.hadoop.fs.Path} object representing the region + * directory under path rootdir + * + * @param rootdir qualified path of HBase root directory + * @param tableName name of table + * @param regionName The encoded region name + * @return {@link org.apache.hadoop.fs.Path} for region + */ + public static Path getRegionDir(Path rootdir, TableName tableName, String regionName) { + return new Path(getTableDir(rootdir, tableName), regionName); + } + + public static Path getWALTableDir(Configuration c, TableName tableName) throws IOException { + return new Path(getNamespaceDir(getWALRootDir(c), tableName.getNamespaceAsString()), + tableName.getQualifierAsString()); + } + /** * Returns the {@link org.apache.hadoop.hbase.TableName} object representing * the table directory under diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/WALSplitter.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/WALSplitter.java index 7d9314b264a..f769e6db18d 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/WALSplitter.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/WALSplitter.java @@ -111,6 +111,7 @@ import org.apache.hadoop.hbase.regionserver.wal.WALEditsReplaySink; import org.apache.hadoop.hbase.util.Bytes; import org.apache.hadoop.hbase.util.CancelableProgressable; import org.apache.hadoop.hbase.util.ClassSize; +import org.apache.hadoop.hbase.util.CommonFSUtils; import org.apache.hadoop.hbase.util.EnvironmentEdgeManager; import org.apache.hadoop.hbase.util.FSUtils; import org.apache.hadoop.hbase.util.Pair; @@ -143,7 +144,9 @@ public class WALSplitter { // Parameters for split process protected final Path walDir; + protected final Path rootDir; protected final FileSystem walFS; + protected final FileSystem rootFS; protected final Configuration conf; // Major subcomponents of the split process. @@ -189,15 +192,17 @@ public class WALSplitter { public final static String SPLIT_WRITER_CREATION_BOUNDED = "hbase.split.writer.creation.bounded"; @VisibleForTesting - WALSplitter(final WALFactory factory, Configuration conf, Path walDir, - FileSystem walFS, LastSequenceId idChecker, - CoordinatedStateManager csm, RecoveryMode mode) { + WALSplitter(final WALFactory factory, Configuration conf, Path walDir, FileSystem walFS, + Path rootDir, FileSystem rootFS, LastSequenceId idChecker, CoordinatedStateManager csm, + RecoveryMode mode) { this.conf = HBaseConfiguration.create(conf); String codecClassName = conf .get(WALCellCodec.WAL_CELL_CODEC_CLASS_KEY, WALCellCodec.class.getName()); this.conf.set(HConstants.RPC_CODEC_CONF_KEY, codecClassName); this.walDir = walDir; this.walFS = walFS; + this.rootDir = rootDir; + this.rootFS = rootFS; this.sequenceIdChecker = idChecker; this.csm = (BaseCoordinatedStateManager)csm; this.walFactory = factory; @@ -249,7 +254,10 @@ public class WALSplitter { public static boolean splitLogFile(Path walDir, FileStatus logfile, FileSystem walFS, Configuration conf, CancelableProgressable reporter, LastSequenceId idChecker, CoordinatedStateManager cp, RecoveryMode mode, final WALFactory factory) throws IOException { - WALSplitter s = new WALSplitter(factory, conf, walDir, walFS, idChecker, cp, mode); + Path rootDir = CommonFSUtils.getRootDir(conf); + FileSystem rootFS = rootDir.getFileSystem(conf); + WALSplitter s = new WALSplitter(factory, conf, walDir, walFS, rootDir, rootFS, idChecker, cp, + mode); return s.splitLogFile(logfile, reporter); } @@ -263,9 +271,11 @@ public class WALSplitter { Collections.singletonList(logDir), null); List splits = new ArrayList(); if (logfiles != null && logfiles.length > 0) { + Path rootDir = CommonFSUtils.getRootDir(conf); + FileSystem rootFS = rootDir.getFileSystem(conf); for (FileStatus logfile: logfiles) { - WALSplitter s = new WALSplitter(factory, conf, walRootDir, walFs, null, null, - RecoveryMode.LOG_SPLITTING); + WALSplitter s = new WALSplitter(factory, conf, walRootDir, walFs, rootDir, rootFS, null, + null, RecoveryMode.LOG_SPLITTING); if (s.splitLogFile(logfile, null)) { finishSplitLogFile(walRootDir, oldLogDir, logfile.getPath(), conf); if (s.outputSink.splits != null) { @@ -347,33 +357,44 @@ public class WALSplitter { String encodedRegionNameAsStr = Bytes.toString(region); lastFlushedSequenceId = lastFlushedSequenceIds.get(encodedRegionNameAsStr); if (lastFlushedSequenceId == null) { - if (this.distributedLogReplay) { - RegionStoreSequenceIds ids = - csm.getSplitLogWorkerCoordination().getRegionFlushedSequenceId(failedServerName, - encodedRegionNameAsStr); - if (ids != null) { + if (!(isRegionDirPresentUnderRoot(entry.getKey().getTablename(), + encodedRegionNameAsStr))) { + // The region directory itself is not present in the FS. This indicates that + // region/table is already removed. We can skip all the edits for this region. + // Setting lastFlushedSequenceId as Long.MAX_VALUE so that all edits will get + // skipped by the seqId check below. See more details in HBASE-24189 + LOG.info(encodedRegionNameAsStr + + " no longer available in the FS. Skipping all edits for this region."); + lastFlushedSequenceId = Long.MAX_VALUE; + } else { + if (this.distributedLogReplay) { + RegionStoreSequenceIds ids = csm.getSplitLogWorkerCoordination() + .getRegionFlushedSequenceId(failedServerName, encodedRegionNameAsStr); + if (ids != null) { + lastFlushedSequenceId = ids.getLastFlushedSequenceId(); + if (LOG.isDebugEnabled()) { + LOG.debug("DLR Last flushed sequenceid for " + encodedRegionNameAsStr + ": " + + TextFormat.shortDebugString(ids)); + } + } + } else if (sequenceIdChecker != null) { + RegionStoreSequenceIds ids = sequenceIdChecker.getLastSequenceId(region); + Map maxSeqIdInStores = new TreeMap( + Bytes.BYTES_COMPARATOR); + for (StoreSequenceId storeSeqId : ids.getStoreSequenceIdList()) { + maxSeqIdInStores.put(storeSeqId.getFamilyName().toByteArray(), + storeSeqId.getSequenceId()); + } + regionMaxSeqIdInStores.put(encodedRegionNameAsStr, maxSeqIdInStores); lastFlushedSequenceId = ids.getLastFlushedSequenceId(); if (LOG.isDebugEnabled()) { - LOG.debug("DLR Last flushed sequenceid for " + encodedRegionNameAsStr + ": " + - TextFormat.shortDebugString(ids)); + LOG.debug("DLS Last flushed sequenceid for " + encodedRegionNameAsStr + ": " + + TextFormat.shortDebugString(ids)); } } - } else if (sequenceIdChecker != null) { - RegionStoreSequenceIds ids = sequenceIdChecker.getLastSequenceId(region); - Map maxSeqIdInStores = new TreeMap(Bytes.BYTES_COMPARATOR); - for (StoreSequenceId storeSeqId : ids.getStoreSequenceIdList()) { - maxSeqIdInStores.put(storeSeqId.getFamilyName().toByteArray(), - storeSeqId.getSequenceId()); + if (lastFlushedSequenceId == null) { + lastFlushedSequenceId = -1L; } - regionMaxSeqIdInStores.put(encodedRegionNameAsStr, maxSeqIdInStores); - lastFlushedSequenceId = ids.getLastFlushedSequenceId(); - if (LOG.isDebugEnabled()) { - LOG.debug("DLS Last flushed sequenceid for " + encodedRegionNameAsStr + ": " + - TextFormat.shortDebugString(ids)); - } - } - if (lastFlushedSequenceId == null) { - lastFlushedSequenceId = -1L; } lastFlushedSequenceIds.put(encodedRegionNameAsStr, lastFlushedSequenceId); } @@ -444,6 +465,12 @@ public class WALSplitter { return !progress_failed; } + private boolean isRegionDirPresentUnderRoot(TableName tableName, String regionName) + throws IOException { + Path regionDirPath = CommonFSUtils.getRegionDir(this.rootDir, tableName, regionName); + return this.rootFS.exists(regionDirPath); + } + /** * Completes the work done by splitLogFile by archiving logs *

diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/wal/TestWALFactory.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/wal/TestWALFactory.java index 4bf211dcfa6..31717b61394 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/wal/TestWALFactory.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/wal/TestWALFactory.java @@ -59,6 +59,7 @@ import org.apache.hadoop.hbase.regionserver.wal.WALCoprocessorHost; import org.apache.hadoop.hbase.regionserver.wal.WALEdit; import org.apache.hadoop.hbase.testclassification.MediumTests; import org.apache.hadoop.hbase.util.Bytes; +import org.apache.hadoop.hbase.util.CommonFSUtils; import org.apache.hadoop.hbase.util.FSUtils; import org.apache.hadoop.hbase.util.Threads; import org.apache.hadoop.hdfs.DistributedFileSystem; @@ -172,12 +173,15 @@ public class TestWALFactory { Path oldLogDir = new Path(hbaseWALDir, HConstants.HREGION_OLDLOGDIR_NAME); final int howmany = 3; HRegionInfo[] infos = new HRegionInfo[3]; + Path tableDataDir = CommonFSUtils.getTableDir(hbaseDir, tableName); + fs.mkdirs(tableDataDir); Path tabledir = FSUtils.getWALTableDir(conf, tableName); fs.mkdirs(tabledir); for(int i = 0; i < howmany; i++) { infos[i] = new HRegionInfo(tableName, Bytes.toBytes("" + i), Bytes.toBytes("" + (i+1)), false); fs.mkdirs(new Path(tabledir, infos[i].getEncodedName())); + fs.mkdirs(new Path(tableDataDir, infos[i].getEncodedName())); LOG.info("allo " + new Path(tabledir, infos[i].getEncodedName()).toString()); } HTableDescriptor htd = new HTableDescriptor(tableName); diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/wal/TestWALReaderOnSecureWAL.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/wal/TestWALReaderOnSecureWAL.java index 613dea327e5..a5351cf8354 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/wal/TestWALReaderOnSecureWAL.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/wal/TestWALReaderOnSecureWAL.java @@ -155,7 +155,7 @@ public class TestWALReaderOnSecureWAL { RecoveryMode.LOG_REPLAY : RecoveryMode.LOG_SPLITTING); Path rootdir = FSUtils.getRootDir(conf); try { - WALSplitter s = new WALSplitter(wals, conf, rootdir, fs, null, null, mode); + WALSplitter s = new WALSplitter(wals, conf, rootdir, fs, rootdir, fs, null, null, mode); s.splitLogFile(listStatus[0], null); Path file = new Path(ZKSplitLog.getSplitLogDir(rootdir, listStatus[0].getPath().getName()), "corrupt"); @@ -200,7 +200,7 @@ public class TestWALReaderOnSecureWAL { RecoveryMode.LOG_REPLAY : RecoveryMode.LOG_SPLITTING); Path rootdir = FSUtils.getRootDir(conf); try { - WALSplitter s = new WALSplitter(wals, conf, rootdir, fs, null, null, mode); + WALSplitter s = new WALSplitter(wals, conf, rootdir, fs, rootdir, fs, null, null, mode); s.splitLogFile(listStatus[0], null); Path file = new Path(ZKSplitLog.getSplitLogDir(rootdir, listStatus[0].getPath().getName()), "corrupt"); diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/wal/TestWALSplit.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/wal/TestWALSplit.java index 153b182d1e6..be80d78817c 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/wal/TestWALSplit.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/wal/TestWALSplit.java @@ -803,7 +803,7 @@ public class TestWALSplit { assertTrue("There should be some log greater than size 0.", 0 < largestSize); // Set up a splitter that will throw an IOE on the output side WALSplitter logSplitter = new WALSplitter(wals, - conf, HBASEDIR, fs, null, null, this.mode) { + conf, HBASEDIR, fs, HBASEDIR, fs, null, null, this.mode) { @Override protected Writer createWriter(Path logfile) throws IOException { Writer mockWriter = Mockito.mock(Writer.class); @@ -989,7 +989,7 @@ public class TestWALSplit { // Create a splitter that reads and writes the data without touching disk WALSplitter logSplitter = new WALSplitter(wals, - localConf, HBASEDIR, fs, null, null, this.mode) { + localConf, HBASEDIR, fs, HBASEDIR, fs, null, null, this.mode) { /* Produce a mock writer that doesn't write anywhere */ @Override @@ -1140,7 +1140,7 @@ public class TestWALSplit { logfiles != null && logfiles.length > 0); WALSplitter logSplitter = new WALSplitter(wals, - conf, HBASEDIR, fs, null, null, this.mode) { + conf, HBASEDIR, fs, HBASEDIR, fs, null, null, this.mode) { @Override protected Writer createWriter(Path logfile) throws IOException { diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/wal/TestWALSplitWithDeletedTableData.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/wal/TestWALSplitWithDeletedTableData.java new file mode 100644 index 00000000000..f0aa54df53a --- /dev/null +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/wal/TestWALSplitWithDeletedTableData.java @@ -0,0 +1,111 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.wal; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertFalse; +import static org.junit.Assert.assertNotNull; +import static org.junit.Assert.assertTrue; + +import java.util.ArrayList; +import java.util.List; + +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.hbase.Cell; +import org.apache.hadoop.hbase.CellUtil; +import org.apache.hadoop.hbase.HBaseTestingUtility; +import org.apache.hadoop.hbase.HColumnDescriptor; +import org.apache.hadoop.hbase.HTableDescriptor; +import org.apache.hadoop.hbase.MiniHBaseCluster; +import org.apache.hadoop.hbase.ServerName; +import org.apache.hadoop.hbase.TableName; +import org.apache.hadoop.hbase.client.Get; +import org.apache.hadoop.hbase.client.Put; +import org.apache.hadoop.hbase.client.Result; +import org.apache.hadoop.hbase.client.Table; +import org.apache.hadoop.hbase.testclassification.LargeTests; +import org.apache.hadoop.hbase.testclassification.RegionServerTests; +import org.apache.hadoop.hbase.util.Bytes; +import org.apache.hadoop.hbase.util.CommonFSUtils; +import org.junit.AfterClass; +import org.junit.BeforeClass; +import org.junit.Test; +import org.junit.experimental.categories.Category; + +@Category({RegionServerTests.class, LargeTests.class}) +public class TestWALSplitWithDeletedTableData { + + private static final HBaseTestingUtility TEST_UTIL = new HBaseTestingUtility(); + + @BeforeClass + public static void setup() throws Exception { + TEST_UTIL.startMiniCluster(2); + } + + @AfterClass + public static void tearDown() throws Exception { + TEST_UTIL.shutdownMiniCluster(); + } + + @Test + public void testWALSplitWithDeletedTableData() throws Exception { + final byte[] CFNAME = Bytes.toBytes("f1"); + final byte[] QNAME = Bytes.toBytes("q1"); + final byte[] VALUE = Bytes.toBytes("v1"); + final TableName t1 = TableName.valueOf("t1"); + final TableName t2 = TableName.valueOf("t2"); + final byte[][] splitRows = { Bytes.toBytes("a"), Bytes.toBytes("b"), Bytes.toBytes("c"), + Bytes.toBytes("d") }; + HTableDescriptor htd = new HTableDescriptor(t1); + htd.addFamily(new HColumnDescriptor(CFNAME)); + Table tab1 = TEST_UTIL.createTable(htd, splitRows); + HTableDescriptor htd2 = new HTableDescriptor(t2); + htd2.addFamily(new HColumnDescriptor(CFNAME)); + Table tab2 = TEST_UTIL.createTable(htd2, splitRows); + List puts = new ArrayList(4); + byte[][] rks = { Bytes.toBytes("ac"), Bytes.toBytes("ba"), Bytes.toBytes("ca"), + Bytes.toBytes("dd") }; + for (byte[] rk : rks) { + puts.add(new Put(rk).addColumn(CFNAME, QNAME, VALUE)); + } + tab1.put(puts); + tab2.put(puts); + MiniHBaseCluster cluster = TEST_UTIL.getMiniHBaseCluster(); + TEST_UTIL.deleteTable(t1); + Path tableDir = CommonFSUtils.getWALTableDir(TEST_UTIL.getConfiguration(), t1); + // Dropping table 't1' removed the table directory from the WAL FS completely + assertFalse(TEST_UTIL.getDFSCluster().getFileSystem().exists(tableDir)); + ServerName rs1 = cluster.getRegionServer(1).getServerName(); + // Kill one RS and wait for the WAL split and replay be over. + cluster.killRegionServer(rs1); + cluster.waitForRegionServerToStop(rs1, 60 * 1000); + assertEquals(1, cluster.hbaseCluster.getLiveRegionServers().size()); + Thread.sleep(1 * 1000); + TEST_UTIL.waitUntilNoRegionsInTransition(60 * 1000); + // Table 't1' is dropped. Assert table directory does not exist in WAL FS after WAL split. + assertFalse(TEST_UTIL.getDFSCluster().getFileSystem().exists(tableDir)); + // Assert the table t2 region's data getting replayed after WAL split and available + for (byte[] rk : rks) { + Result result = tab2.get(new Get(rk)); + assertFalse(result.isEmpty()); + Cell cell = result.getColumnLatestCell(CFNAME, QNAME); + assertNotNull(cell); + assertTrue(CellUtil.matchingValue(cell, VALUE)); + } + } +} \ No newline at end of file