HBASE-24189 WALSplit recreates region dirs for deleted table with recovered edits data.
This commit is contained in:
parent
a21e9049a5
commit
81ee344439
|
@ -378,6 +378,24 @@ public abstract class CommonFSUtils {
|
|||
tableName.getQualifierAsString());
|
||||
}
|
||||
|
||||
/**
|
||||
* Returns the {@link org.apache.hadoop.fs.Path} object representing the region
|
||||
* directory under path rootdir
|
||||
*
|
||||
* @param rootdir qualified path of HBase root directory
|
||||
* @param tableName name of table
|
||||
* @param regionName The encoded region name
|
||||
* @return {@link org.apache.hadoop.fs.Path} for region
|
||||
*/
|
||||
public static Path getRegionDir(Path rootdir, TableName tableName, String regionName) {
|
||||
return new Path(getTableDir(rootdir, tableName), regionName);
|
||||
}
|
||||
|
||||
public static Path getWALTableDir(Configuration c, TableName tableName) throws IOException {
|
||||
return new Path(getNamespaceDir(getWALRootDir(c), tableName.getNamespaceAsString()),
|
||||
tableName.getQualifierAsString());
|
||||
}
|
||||
|
||||
/**
|
||||
* Returns the {@link org.apache.hadoop.hbase.TableName} object representing
|
||||
* the table directory under
|
||||
|
|
|
@ -111,6 +111,7 @@ import org.apache.hadoop.hbase.regionserver.wal.WALEditsReplaySink;
|
|||
import org.apache.hadoop.hbase.util.Bytes;
|
||||
import org.apache.hadoop.hbase.util.CancelableProgressable;
|
||||
import org.apache.hadoop.hbase.util.ClassSize;
|
||||
import org.apache.hadoop.hbase.util.CommonFSUtils;
|
||||
import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
|
||||
import org.apache.hadoop.hbase.util.FSUtils;
|
||||
import org.apache.hadoop.hbase.util.Pair;
|
||||
|
@ -143,7 +144,9 @@ public class WALSplitter {
|
|||
|
||||
// Parameters for split process
|
||||
protected final Path walDir;
|
||||
protected final Path rootDir;
|
||||
protected final FileSystem walFS;
|
||||
protected final FileSystem rootFS;
|
||||
protected final Configuration conf;
|
||||
|
||||
// Major subcomponents of the split process.
|
||||
|
@ -189,15 +192,17 @@ public class WALSplitter {
|
|||
public final static String SPLIT_WRITER_CREATION_BOUNDED = "hbase.split.writer.creation.bounded";
|
||||
|
||||
@VisibleForTesting
|
||||
WALSplitter(final WALFactory factory, Configuration conf, Path walDir,
|
||||
FileSystem walFS, LastSequenceId idChecker,
|
||||
CoordinatedStateManager csm, RecoveryMode mode) {
|
||||
WALSplitter(final WALFactory factory, Configuration conf, Path walDir, FileSystem walFS,
|
||||
Path rootDir, FileSystem rootFS, LastSequenceId idChecker, CoordinatedStateManager csm,
|
||||
RecoveryMode mode) {
|
||||
this.conf = HBaseConfiguration.create(conf);
|
||||
String codecClassName = conf
|
||||
.get(WALCellCodec.WAL_CELL_CODEC_CLASS_KEY, WALCellCodec.class.getName());
|
||||
this.conf.set(HConstants.RPC_CODEC_CONF_KEY, codecClassName);
|
||||
this.walDir = walDir;
|
||||
this.walFS = walFS;
|
||||
this.rootDir = rootDir;
|
||||
this.rootFS = rootFS;
|
||||
this.sequenceIdChecker = idChecker;
|
||||
this.csm = (BaseCoordinatedStateManager)csm;
|
||||
this.walFactory = factory;
|
||||
|
@ -249,7 +254,10 @@ public class WALSplitter {
|
|||
public static boolean splitLogFile(Path walDir, FileStatus logfile, FileSystem walFS,
|
||||
Configuration conf, CancelableProgressable reporter, LastSequenceId idChecker,
|
||||
CoordinatedStateManager cp, RecoveryMode mode, final WALFactory factory) throws IOException {
|
||||
WALSplitter s = new WALSplitter(factory, conf, walDir, walFS, idChecker, cp, mode);
|
||||
Path rootDir = CommonFSUtils.getRootDir(conf);
|
||||
FileSystem rootFS = rootDir.getFileSystem(conf);
|
||||
WALSplitter s = new WALSplitter(factory, conf, walDir, walFS, rootDir, rootFS, idChecker, cp,
|
||||
mode);
|
||||
return s.splitLogFile(logfile, reporter);
|
||||
}
|
||||
|
||||
|
@ -263,9 +271,11 @@ public class WALSplitter {
|
|||
Collections.singletonList(logDir), null);
|
||||
List<Path> splits = new ArrayList<Path>();
|
||||
if (logfiles != null && logfiles.length > 0) {
|
||||
Path rootDir = CommonFSUtils.getRootDir(conf);
|
||||
FileSystem rootFS = rootDir.getFileSystem(conf);
|
||||
for (FileStatus logfile: logfiles) {
|
||||
WALSplitter s = new WALSplitter(factory, conf, walRootDir, walFs, null, null,
|
||||
RecoveryMode.LOG_SPLITTING);
|
||||
WALSplitter s = new WALSplitter(factory, conf, walRootDir, walFs, rootDir, rootFS, null,
|
||||
null, RecoveryMode.LOG_SPLITTING);
|
||||
if (s.splitLogFile(logfile, null)) {
|
||||
finishSplitLogFile(walRootDir, oldLogDir, logfile.getPath(), conf);
|
||||
if (s.outputSink.splits != null) {
|
||||
|
@ -347,33 +357,44 @@ public class WALSplitter {
|
|||
String encodedRegionNameAsStr = Bytes.toString(region);
|
||||
lastFlushedSequenceId = lastFlushedSequenceIds.get(encodedRegionNameAsStr);
|
||||
if (lastFlushedSequenceId == null) {
|
||||
if (this.distributedLogReplay) {
|
||||
RegionStoreSequenceIds ids =
|
||||
csm.getSplitLogWorkerCoordination().getRegionFlushedSequenceId(failedServerName,
|
||||
encodedRegionNameAsStr);
|
||||
if (ids != null) {
|
||||
if (!(isRegionDirPresentUnderRoot(entry.getKey().getTablename(),
|
||||
encodedRegionNameAsStr))) {
|
||||
// The region directory itself is not present in the FS. This indicates that
|
||||
// region/table is already removed. We can skip all the edits for this region.
|
||||
// Setting lastFlushedSequenceId as Long.MAX_VALUE so that all edits will get
|
||||
// skipped by the seqId check below. See more details in HBASE-24189
|
||||
LOG.info(encodedRegionNameAsStr
|
||||
+ " no longer available in the FS. Skipping all edits for this region.");
|
||||
lastFlushedSequenceId = Long.MAX_VALUE;
|
||||
} else {
|
||||
if (this.distributedLogReplay) {
|
||||
RegionStoreSequenceIds ids = csm.getSplitLogWorkerCoordination()
|
||||
.getRegionFlushedSequenceId(failedServerName, encodedRegionNameAsStr);
|
||||
if (ids != null) {
|
||||
lastFlushedSequenceId = ids.getLastFlushedSequenceId();
|
||||
if (LOG.isDebugEnabled()) {
|
||||
LOG.debug("DLR Last flushed sequenceid for " + encodedRegionNameAsStr + ": "
|
||||
+ TextFormat.shortDebugString(ids));
|
||||
}
|
||||
}
|
||||
} else if (sequenceIdChecker != null) {
|
||||
RegionStoreSequenceIds ids = sequenceIdChecker.getLastSequenceId(region);
|
||||
Map<byte[], Long> maxSeqIdInStores = new TreeMap<byte[], Long>(
|
||||
Bytes.BYTES_COMPARATOR);
|
||||
for (StoreSequenceId storeSeqId : ids.getStoreSequenceIdList()) {
|
||||
maxSeqIdInStores.put(storeSeqId.getFamilyName().toByteArray(),
|
||||
storeSeqId.getSequenceId());
|
||||
}
|
||||
regionMaxSeqIdInStores.put(encodedRegionNameAsStr, maxSeqIdInStores);
|
||||
lastFlushedSequenceId = ids.getLastFlushedSequenceId();
|
||||
if (LOG.isDebugEnabled()) {
|
||||
LOG.debug("DLR Last flushed sequenceid for " + encodedRegionNameAsStr + ": " +
|
||||
TextFormat.shortDebugString(ids));
|
||||
LOG.debug("DLS Last flushed sequenceid for " + encodedRegionNameAsStr + ": "
|
||||
+ TextFormat.shortDebugString(ids));
|
||||
}
|
||||
}
|
||||
} else if (sequenceIdChecker != null) {
|
||||
RegionStoreSequenceIds ids = sequenceIdChecker.getLastSequenceId(region);
|
||||
Map<byte[], Long> maxSeqIdInStores = new TreeMap<byte[], Long>(Bytes.BYTES_COMPARATOR);
|
||||
for (StoreSequenceId storeSeqId : ids.getStoreSequenceIdList()) {
|
||||
maxSeqIdInStores.put(storeSeqId.getFamilyName().toByteArray(),
|
||||
storeSeqId.getSequenceId());
|
||||
if (lastFlushedSequenceId == null) {
|
||||
lastFlushedSequenceId = -1L;
|
||||
}
|
||||
regionMaxSeqIdInStores.put(encodedRegionNameAsStr, maxSeqIdInStores);
|
||||
lastFlushedSequenceId = ids.getLastFlushedSequenceId();
|
||||
if (LOG.isDebugEnabled()) {
|
||||
LOG.debug("DLS Last flushed sequenceid for " + encodedRegionNameAsStr + ": " +
|
||||
TextFormat.shortDebugString(ids));
|
||||
}
|
||||
}
|
||||
if (lastFlushedSequenceId == null) {
|
||||
lastFlushedSequenceId = -1L;
|
||||
}
|
||||
lastFlushedSequenceIds.put(encodedRegionNameAsStr, lastFlushedSequenceId);
|
||||
}
|
||||
|
@ -444,6 +465,12 @@ public class WALSplitter {
|
|||
return !progress_failed;
|
||||
}
|
||||
|
||||
private boolean isRegionDirPresentUnderRoot(TableName tableName, String regionName)
|
||||
throws IOException {
|
||||
Path regionDirPath = CommonFSUtils.getRegionDir(this.rootDir, tableName, regionName);
|
||||
return this.rootFS.exists(regionDirPath);
|
||||
}
|
||||
|
||||
/**
|
||||
* Completes the work done by splitLogFile by archiving logs
|
||||
* <p>
|
||||
|
|
|
@ -59,6 +59,7 @@ import org.apache.hadoop.hbase.regionserver.wal.WALCoprocessorHost;
|
|||
import org.apache.hadoop.hbase.regionserver.wal.WALEdit;
|
||||
import org.apache.hadoop.hbase.testclassification.MediumTests;
|
||||
import org.apache.hadoop.hbase.util.Bytes;
|
||||
import org.apache.hadoop.hbase.util.CommonFSUtils;
|
||||
import org.apache.hadoop.hbase.util.FSUtils;
|
||||
import org.apache.hadoop.hbase.util.Threads;
|
||||
import org.apache.hadoop.hdfs.DistributedFileSystem;
|
||||
|
@ -172,12 +173,15 @@ public class TestWALFactory {
|
|||
Path oldLogDir = new Path(hbaseWALDir, HConstants.HREGION_OLDLOGDIR_NAME);
|
||||
final int howmany = 3;
|
||||
HRegionInfo[] infos = new HRegionInfo[3];
|
||||
Path tableDataDir = CommonFSUtils.getTableDir(hbaseDir, tableName);
|
||||
fs.mkdirs(tableDataDir);
|
||||
Path tabledir = FSUtils.getWALTableDir(conf, tableName);
|
||||
fs.mkdirs(tabledir);
|
||||
for(int i = 0; i < howmany; i++) {
|
||||
infos[i] = new HRegionInfo(tableName,
|
||||
Bytes.toBytes("" + i), Bytes.toBytes("" + (i+1)), false);
|
||||
fs.mkdirs(new Path(tabledir, infos[i].getEncodedName()));
|
||||
fs.mkdirs(new Path(tableDataDir, infos[i].getEncodedName()));
|
||||
LOG.info("allo " + new Path(tabledir, infos[i].getEncodedName()).toString());
|
||||
}
|
||||
HTableDescriptor htd = new HTableDescriptor(tableName);
|
||||
|
|
|
@ -155,7 +155,7 @@ public class TestWALReaderOnSecureWAL {
|
|||
RecoveryMode.LOG_REPLAY : RecoveryMode.LOG_SPLITTING);
|
||||
Path rootdir = FSUtils.getRootDir(conf);
|
||||
try {
|
||||
WALSplitter s = new WALSplitter(wals, conf, rootdir, fs, null, null, mode);
|
||||
WALSplitter s = new WALSplitter(wals, conf, rootdir, fs, rootdir, fs, null, null, mode);
|
||||
s.splitLogFile(listStatus[0], null);
|
||||
Path file = new Path(ZKSplitLog.getSplitLogDir(rootdir, listStatus[0].getPath().getName()),
|
||||
"corrupt");
|
||||
|
@ -200,7 +200,7 @@ public class TestWALReaderOnSecureWAL {
|
|||
RecoveryMode.LOG_REPLAY : RecoveryMode.LOG_SPLITTING);
|
||||
Path rootdir = FSUtils.getRootDir(conf);
|
||||
try {
|
||||
WALSplitter s = new WALSplitter(wals, conf, rootdir, fs, null, null, mode);
|
||||
WALSplitter s = new WALSplitter(wals, conf, rootdir, fs, rootdir, fs, null, null, mode);
|
||||
s.splitLogFile(listStatus[0], null);
|
||||
Path file = new Path(ZKSplitLog.getSplitLogDir(rootdir, listStatus[0].getPath().getName()),
|
||||
"corrupt");
|
||||
|
|
|
@ -803,7 +803,7 @@ public class TestWALSplit {
|
|||
assertTrue("There should be some log greater than size 0.", 0 < largestSize);
|
||||
// Set up a splitter that will throw an IOE on the output side
|
||||
WALSplitter logSplitter = new WALSplitter(wals,
|
||||
conf, HBASEDIR, fs, null, null, this.mode) {
|
||||
conf, HBASEDIR, fs, HBASEDIR, fs, null, null, this.mode) {
|
||||
@Override
|
||||
protected Writer createWriter(Path logfile) throws IOException {
|
||||
Writer mockWriter = Mockito.mock(Writer.class);
|
||||
|
@ -989,7 +989,7 @@ public class TestWALSplit {
|
|||
|
||||
// Create a splitter that reads and writes the data without touching disk
|
||||
WALSplitter logSplitter = new WALSplitter(wals,
|
||||
localConf, HBASEDIR, fs, null, null, this.mode) {
|
||||
localConf, HBASEDIR, fs, HBASEDIR, fs, null, null, this.mode) {
|
||||
|
||||
/* Produce a mock writer that doesn't write anywhere */
|
||||
@Override
|
||||
|
@ -1140,7 +1140,7 @@ public class TestWALSplit {
|
|||
logfiles != null && logfiles.length > 0);
|
||||
|
||||
WALSplitter logSplitter = new WALSplitter(wals,
|
||||
conf, HBASEDIR, fs, null, null, this.mode) {
|
||||
conf, HBASEDIR, fs, HBASEDIR, fs, null, null, this.mode) {
|
||||
@Override
|
||||
protected Writer createWriter(Path logfile)
|
||||
throws IOException {
|
||||
|
|
|
@ -0,0 +1,111 @@
|
|||
/**
|
||||
* Licensed to the Apache Software Foundation (ASF) under one
|
||||
* or more contributor license agreements. See the NOTICE file
|
||||
* distributed with this work for additional information
|
||||
* regarding copyright ownership. The ASF licenses this file
|
||||
* to you under the Apache License, Version 2.0 (the
|
||||
* "License"); you may not use this file except in compliance
|
||||
* with the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
package org.apache.hadoop.hbase.wal;
|
||||
|
||||
import static org.junit.Assert.assertEquals;
|
||||
import static org.junit.Assert.assertFalse;
|
||||
import static org.junit.Assert.assertNotNull;
|
||||
import static org.junit.Assert.assertTrue;
|
||||
|
||||
import java.util.ArrayList;
|
||||
import java.util.List;
|
||||
|
||||
import org.apache.hadoop.fs.Path;
|
||||
import org.apache.hadoop.hbase.Cell;
|
||||
import org.apache.hadoop.hbase.CellUtil;
|
||||
import org.apache.hadoop.hbase.HBaseTestingUtility;
|
||||
import org.apache.hadoop.hbase.HColumnDescriptor;
|
||||
import org.apache.hadoop.hbase.HTableDescriptor;
|
||||
import org.apache.hadoop.hbase.MiniHBaseCluster;
|
||||
import org.apache.hadoop.hbase.ServerName;
|
||||
import org.apache.hadoop.hbase.TableName;
|
||||
import org.apache.hadoop.hbase.client.Get;
|
||||
import org.apache.hadoop.hbase.client.Put;
|
||||
import org.apache.hadoop.hbase.client.Result;
|
||||
import org.apache.hadoop.hbase.client.Table;
|
||||
import org.apache.hadoop.hbase.testclassification.LargeTests;
|
||||
import org.apache.hadoop.hbase.testclassification.RegionServerTests;
|
||||
import org.apache.hadoop.hbase.util.Bytes;
|
||||
import org.apache.hadoop.hbase.util.CommonFSUtils;
|
||||
import org.junit.AfterClass;
|
||||
import org.junit.BeforeClass;
|
||||
import org.junit.Test;
|
||||
import org.junit.experimental.categories.Category;
|
||||
|
||||
@Category({RegionServerTests.class, LargeTests.class})
|
||||
public class TestWALSplitWithDeletedTableData {
|
||||
|
||||
private static final HBaseTestingUtility TEST_UTIL = new HBaseTestingUtility();
|
||||
|
||||
@BeforeClass
|
||||
public static void setup() throws Exception {
|
||||
TEST_UTIL.startMiniCluster(2);
|
||||
}
|
||||
|
||||
@AfterClass
|
||||
public static void tearDown() throws Exception {
|
||||
TEST_UTIL.shutdownMiniCluster();
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testWALSplitWithDeletedTableData() throws Exception {
|
||||
final byte[] CFNAME = Bytes.toBytes("f1");
|
||||
final byte[] QNAME = Bytes.toBytes("q1");
|
||||
final byte[] VALUE = Bytes.toBytes("v1");
|
||||
final TableName t1 = TableName.valueOf("t1");
|
||||
final TableName t2 = TableName.valueOf("t2");
|
||||
final byte[][] splitRows = { Bytes.toBytes("a"), Bytes.toBytes("b"), Bytes.toBytes("c"),
|
||||
Bytes.toBytes("d") };
|
||||
HTableDescriptor htd = new HTableDescriptor(t1);
|
||||
htd.addFamily(new HColumnDescriptor(CFNAME));
|
||||
Table tab1 = TEST_UTIL.createTable(htd, splitRows);
|
||||
HTableDescriptor htd2 = new HTableDescriptor(t2);
|
||||
htd2.addFamily(new HColumnDescriptor(CFNAME));
|
||||
Table tab2 = TEST_UTIL.createTable(htd2, splitRows);
|
||||
List<Put> puts = new ArrayList<Put>(4);
|
||||
byte[][] rks = { Bytes.toBytes("ac"), Bytes.toBytes("ba"), Bytes.toBytes("ca"),
|
||||
Bytes.toBytes("dd") };
|
||||
for (byte[] rk : rks) {
|
||||
puts.add(new Put(rk).addColumn(CFNAME, QNAME, VALUE));
|
||||
}
|
||||
tab1.put(puts);
|
||||
tab2.put(puts);
|
||||
MiniHBaseCluster cluster = TEST_UTIL.getMiniHBaseCluster();
|
||||
TEST_UTIL.deleteTable(t1);
|
||||
Path tableDir = CommonFSUtils.getWALTableDir(TEST_UTIL.getConfiguration(), t1);
|
||||
// Dropping table 't1' removed the table directory from the WAL FS completely
|
||||
assertFalse(TEST_UTIL.getDFSCluster().getFileSystem().exists(tableDir));
|
||||
ServerName rs1 = cluster.getRegionServer(1).getServerName();
|
||||
// Kill one RS and wait for the WAL split and replay be over.
|
||||
cluster.killRegionServer(rs1);
|
||||
cluster.waitForRegionServerToStop(rs1, 60 * 1000);
|
||||
assertEquals(1, cluster.hbaseCluster.getLiveRegionServers().size());
|
||||
Thread.sleep(1 * 1000);
|
||||
TEST_UTIL.waitUntilNoRegionsInTransition(60 * 1000);
|
||||
// Table 't1' is dropped. Assert table directory does not exist in WAL FS after WAL split.
|
||||
assertFalse(TEST_UTIL.getDFSCluster().getFileSystem().exists(tableDir));
|
||||
// Assert the table t2 region's data getting replayed after WAL split and available
|
||||
for (byte[] rk : rks) {
|
||||
Result result = tab2.get(new Get(rk));
|
||||
assertFalse(result.isEmpty());
|
||||
Cell cell = result.getColumnLatestCell(CFNAME, QNAME);
|
||||
assertNotNull(cell);
|
||||
assertTrue(CellUtil.matchingValue(cell, VALUE));
|
||||
}
|
||||
}
|
||||
}
|
Loading…
Reference in New Issue