HBASE-24189 WALSplit recreates region dirs for deleted table with recovered edits data.
This commit is contained in:
parent
1b1c269911
commit
ed7dc9ed4c
|
@ -424,6 +424,19 @@ public final class CommonFSUtils {
|
||||||
tableName.getQualifierAsString());
|
tableName.getQualifierAsString());
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Returns the {@link org.apache.hadoop.fs.Path} object representing the region directory under
|
||||||
|
* path rootdir
|
||||||
|
*
|
||||||
|
* @param rootdir qualified path of HBase root directory
|
||||||
|
* @param tableName name of table
|
||||||
|
* @param regionName The encoded region name
|
||||||
|
* @return {@link org.apache.hadoop.fs.Path} for region
|
||||||
|
*/
|
||||||
|
public static Path getRegionDir(Path rootdir, TableName tableName, String regionName) {
|
||||||
|
return new Path(getTableDir(rootdir, tableName), regionName);
|
||||||
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Returns the {@link org.apache.hadoop.hbase.TableName} object representing
|
* Returns the {@link org.apache.hadoop.hbase.TableName} object representing
|
||||||
* the table directory under
|
* the table directory under
|
||||||
|
|
|
@ -41,6 +41,7 @@ import org.apache.hadoop.fs.Path;
|
||||||
import org.apache.hadoop.hbase.HBaseConfiguration;
|
import org.apache.hadoop.hbase.HBaseConfiguration;
|
||||||
import org.apache.hadoop.hbase.HConstants;
|
import org.apache.hadoop.hbase.HConstants;
|
||||||
import org.apache.hadoop.hbase.TableDescriptors;
|
import org.apache.hadoop.hbase.TableDescriptors;
|
||||||
|
import org.apache.hadoop.hbase.TableName;
|
||||||
import org.apache.hadoop.hbase.coordination.SplitLogWorkerCoordination;
|
import org.apache.hadoop.hbase.coordination.SplitLogWorkerCoordination;
|
||||||
import org.apache.hadoop.hbase.master.SplitLogManager;
|
import org.apache.hadoop.hbase.master.SplitLogManager;
|
||||||
import org.apache.hadoop.hbase.monitoring.MonitoredTask;
|
import org.apache.hadoop.hbase.monitoring.MonitoredTask;
|
||||||
|
@ -284,23 +285,34 @@ public class WALSplitter {
|
||||||
String encodedRegionNameAsStr = Bytes.toString(region);
|
String encodedRegionNameAsStr = Bytes.toString(region);
|
||||||
lastFlushedSequenceId = lastFlushedSequenceIds.get(encodedRegionNameAsStr);
|
lastFlushedSequenceId = lastFlushedSequenceIds.get(encodedRegionNameAsStr);
|
||||||
if (lastFlushedSequenceId == null) {
|
if (lastFlushedSequenceId == null) {
|
||||||
if (sequenceIdChecker != null) {
|
if (!(isRegionDirPresentUnderRoot(entry.getKey().getTableName(), encodedRegionNameAsStr))) {
|
||||||
RegionStoreSequenceIds ids = sequenceIdChecker.getLastSequenceId(region);
|
// The region directory itself is not present in the FS. This indicates that
|
||||||
Map<byte[], Long> maxSeqIdInStores = new TreeMap<>(Bytes.BYTES_COMPARATOR);
|
// the region/table is already removed. We can just skip all the edits for this
|
||||||
for (StoreSequenceId storeSeqId : ids.getStoreSequenceIdList()) {
|
// region. Setting lastFlushedSequenceId as Long.MAX_VALUE so that all edits
|
||||||
maxSeqIdInStores.put(storeSeqId.getFamilyName().toByteArray(),
|
// will get skipped by the seqId check below.
|
||||||
storeSeqId.getSequenceId());
|
// See more details at https://issues.apache.org/jira/browse/HBASE-24189
|
||||||
|
LOG.info("{} no longer available in the FS. Skipping all edits for this region.",
|
||||||
|
encodedRegionNameAsStr);
|
||||||
|
lastFlushedSequenceId = Long.MAX_VALUE;
|
||||||
|
} else {
|
||||||
|
if (sequenceIdChecker != null) {
|
||||||
|
RegionStoreSequenceIds ids = sequenceIdChecker.getLastSequenceId(region);
|
||||||
|
Map<byte[], Long> maxSeqIdInStores = new TreeMap<>(Bytes.BYTES_COMPARATOR);
|
||||||
|
for (StoreSequenceId storeSeqId : ids.getStoreSequenceIdList()) {
|
||||||
|
maxSeqIdInStores.put(storeSeqId.getFamilyName().toByteArray(),
|
||||||
|
storeSeqId.getSequenceId());
|
||||||
|
}
|
||||||
|
regionMaxSeqIdInStores.put(encodedRegionNameAsStr, maxSeqIdInStores);
|
||||||
|
lastFlushedSequenceId = ids.getLastFlushedSequenceId();
|
||||||
|
if (LOG.isDebugEnabled()) {
|
||||||
|
LOG.debug("DLS Last flushed sequenceid for " + encodedRegionNameAsStr + ": "
|
||||||
|
+ TextFormat.shortDebugString(ids));
|
||||||
|
}
|
||||||
}
|
}
|
||||||
regionMaxSeqIdInStores.put(encodedRegionNameAsStr, maxSeqIdInStores);
|
if (lastFlushedSequenceId == null) {
|
||||||
lastFlushedSequenceId = ids.getLastFlushedSequenceId();
|
lastFlushedSequenceId = -1L;
|
||||||
if (LOG.isDebugEnabled()) {
|
|
||||||
LOG.debug("DLS Last flushed sequenceid for " + encodedRegionNameAsStr + ": " +
|
|
||||||
TextFormat.shortDebugString(ids));
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
if (lastFlushedSequenceId == null) {
|
|
||||||
lastFlushedSequenceId = -1L;
|
|
||||||
}
|
|
||||||
lastFlushedSequenceIds.put(encodedRegionNameAsStr, lastFlushedSequenceId);
|
lastFlushedSequenceIds.put(encodedRegionNameAsStr, lastFlushedSequenceId);
|
||||||
}
|
}
|
||||||
if (lastFlushedSequenceId >= entry.getKey().getSequenceId()) {
|
if (lastFlushedSequenceId >= entry.getKey().getSequenceId()) {
|
||||||
|
@ -376,6 +388,12 @@ public class WALSplitter {
|
||||||
return !progressFailed;
|
return !progressFailed;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
private boolean isRegionDirPresentUnderRoot(TableName tableName, String regionName)
|
||||||
|
throws IOException {
|
||||||
|
Path regionDirPath = CommonFSUtils.getRegionDir(this.rootDir, tableName, regionName);
|
||||||
|
return this.rootFS.exists(regionDirPath);
|
||||||
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Create a new {@link Reader} for reading logs to split.
|
* Create a new {@link Reader} for reading logs to split.
|
||||||
*/
|
*/
|
||||||
|
|
|
@ -181,12 +181,15 @@ public class TestWALFactory {
|
||||||
final MultiVersionConcurrencyControl mvcc = new MultiVersionConcurrencyControl(1);
|
final MultiVersionConcurrencyControl mvcc = new MultiVersionConcurrencyControl(1);
|
||||||
final int howmany = 3;
|
final int howmany = 3;
|
||||||
RegionInfo[] infos = new RegionInfo[3];
|
RegionInfo[] infos = new RegionInfo[3];
|
||||||
|
Path tableDataDir = CommonFSUtils.getTableDir(hbaseDir, tableName);
|
||||||
|
fs.mkdirs(tableDataDir);
|
||||||
Path tabledir = CommonFSUtils.getWALTableDir(conf, tableName);
|
Path tabledir = CommonFSUtils.getWALTableDir(conf, tableName);
|
||||||
fs.mkdirs(tabledir);
|
fs.mkdirs(tabledir);
|
||||||
for (int i = 0; i < howmany; i++) {
|
for (int i = 0; i < howmany; i++) {
|
||||||
infos[i] = RegionInfoBuilder.newBuilder(tableName).setStartKey(Bytes.toBytes("" + i))
|
infos[i] = RegionInfoBuilder.newBuilder(tableName).setStartKey(Bytes.toBytes("" + i))
|
||||||
.setEndKey(Bytes.toBytes("" + (i + 1))).build();
|
.setEndKey(Bytes.toBytes("" + (i + 1))).build();
|
||||||
fs.mkdirs(new Path(tabledir, infos[i].getEncodedName()));
|
fs.mkdirs(new Path(tabledir, infos[i].getEncodedName()));
|
||||||
|
fs.mkdirs(new Path(tableDataDir, infos[i].getEncodedName()));
|
||||||
LOG.info("allo " + new Path(tabledir, infos[i].getEncodedName()).toString());
|
LOG.info("allo " + new Path(tabledir, infos[i].getEncodedName()).toString());
|
||||||
}
|
}
|
||||||
NavigableMap<byte[], Integer> scopes = new TreeMap<>(Bytes.BYTES_COMPARATOR);
|
NavigableMap<byte[], Integer> scopes = new TreeMap<>(Bytes.BYTES_COMPARATOR);
|
||||||
|
|
|
@ -0,0 +1,116 @@
|
||||||
|
/**
|
||||||
|
* Licensed to the Apache Software Foundation (ASF) under one
|
||||||
|
* or more contributor license agreements. See the NOTICE file
|
||||||
|
* distributed with this work for additional information
|
||||||
|
* regarding copyright ownership. The ASF licenses this file
|
||||||
|
* to you under the Apache License, Version 2.0 (the
|
||||||
|
* "License"); you may not use this file except in compliance
|
||||||
|
* with the License. You may obtain a copy of the License at
|
||||||
|
*
|
||||||
|
* http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
*
|
||||||
|
* Unless required by applicable law or agreed to in writing, software
|
||||||
|
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||||
|
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||||
|
* See the License for the specific language governing permissions and
|
||||||
|
* limitations under the License.
|
||||||
|
*/
|
||||||
|
package org.apache.hadoop.hbase.wal;
|
||||||
|
|
||||||
|
import static org.junit.Assert.assertEquals;
|
||||||
|
import static org.junit.Assert.assertFalse;
|
||||||
|
import static org.junit.Assert.assertNotNull;
|
||||||
|
import static org.junit.Assert.assertTrue;
|
||||||
|
|
||||||
|
import java.util.ArrayList;
|
||||||
|
import java.util.List;
|
||||||
|
|
||||||
|
import org.apache.hadoop.fs.Path;
|
||||||
|
import org.apache.hadoop.hbase.Cell;
|
||||||
|
import org.apache.hadoop.hbase.CellUtil;
|
||||||
|
import org.apache.hadoop.hbase.HBaseClassTestRule;
|
||||||
|
import org.apache.hadoop.hbase.HBaseTestingUtility;
|
||||||
|
import org.apache.hadoop.hbase.MiniHBaseCluster;
|
||||||
|
import org.apache.hadoop.hbase.ServerName;
|
||||||
|
import org.apache.hadoop.hbase.TableName;
|
||||||
|
import org.apache.hadoop.hbase.client.ColumnFamilyDescriptorBuilder;
|
||||||
|
import org.apache.hadoop.hbase.client.Get;
|
||||||
|
import org.apache.hadoop.hbase.client.Put;
|
||||||
|
import org.apache.hadoop.hbase.client.Result;
|
||||||
|
import org.apache.hadoop.hbase.client.Table;
|
||||||
|
import org.apache.hadoop.hbase.client.TableDescriptorBuilder;
|
||||||
|
import org.apache.hadoop.hbase.testclassification.LargeTests;
|
||||||
|
import org.apache.hadoop.hbase.testclassification.RegionServerTests;
|
||||||
|
import org.apache.hadoop.hbase.util.Bytes;
|
||||||
|
import org.apache.hadoop.hbase.util.CommonFSUtils;
|
||||||
|
import org.junit.AfterClass;
|
||||||
|
import org.junit.BeforeClass;
|
||||||
|
import org.junit.ClassRule;
|
||||||
|
import org.junit.Test;
|
||||||
|
import org.junit.experimental.categories.Category;
|
||||||
|
|
||||||
|
@Category({RegionServerTests.class, LargeTests.class})
|
||||||
|
public class TestWALSplitWithDeletedTableData {
|
||||||
|
|
||||||
|
@ClassRule
|
||||||
|
public static final HBaseClassTestRule CLASS_RULE = HBaseClassTestRule
|
||||||
|
.forClass(TestWALSplitWithDeletedTableData.class);
|
||||||
|
private static final HBaseTestingUtility TEST_UTIL = new HBaseTestingUtility();
|
||||||
|
|
||||||
|
@BeforeClass
|
||||||
|
public static void setup() throws Exception {
|
||||||
|
TEST_UTIL.startMiniCluster(2);
|
||||||
|
}
|
||||||
|
|
||||||
|
@AfterClass
|
||||||
|
public static void tearDown() throws Exception {
|
||||||
|
TEST_UTIL.shutdownMiniCluster();
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void testWALSplitWithDeletedTableData() throws Exception {
|
||||||
|
final byte[] CFNAME = Bytes.toBytes("f1");
|
||||||
|
final byte[] QNAME = Bytes.toBytes("q1");
|
||||||
|
final byte[] VALUE = Bytes.toBytes("v1");
|
||||||
|
final TableName t1 = TableName.valueOf("t1");
|
||||||
|
final TableName t2 = TableName.valueOf("t2");
|
||||||
|
final byte[][] splitRows = { Bytes.toBytes("a"), Bytes.toBytes("b"), Bytes.toBytes("c"),
|
||||||
|
Bytes.toBytes("d") };
|
||||||
|
TableDescriptorBuilder htdBuilder1 = TableDescriptorBuilder.newBuilder(t1);
|
||||||
|
htdBuilder1.setColumnFamily(ColumnFamilyDescriptorBuilder.newBuilder(CFNAME).build());
|
||||||
|
Table tab1 = TEST_UTIL.createTable(htdBuilder1.build(), splitRows);
|
||||||
|
TableDescriptorBuilder htdBuilder2 = TableDescriptorBuilder.newBuilder(t2);
|
||||||
|
htdBuilder2.setColumnFamily(ColumnFamilyDescriptorBuilder.newBuilder(CFNAME).build());
|
||||||
|
Table tab2 = TEST_UTIL.createTable(htdBuilder2.build(), splitRows);
|
||||||
|
List<Put> puts = new ArrayList<Put>(4);
|
||||||
|
byte[][] rks = { Bytes.toBytes("ac"), Bytes.toBytes("ba"), Bytes.toBytes("ca"),
|
||||||
|
Bytes.toBytes("dd") };
|
||||||
|
for (byte[] rk : rks) {
|
||||||
|
puts.add(new Put(rk).addColumn(CFNAME, QNAME, VALUE));
|
||||||
|
}
|
||||||
|
tab1.put(puts);
|
||||||
|
tab2.put(puts);
|
||||||
|
MiniHBaseCluster cluster = TEST_UTIL.getMiniHBaseCluster();
|
||||||
|
TEST_UTIL.deleteTable(t1);
|
||||||
|
Path tableDir = CommonFSUtils.getWALTableDir(TEST_UTIL.getConfiguration(), t1);
|
||||||
|
// Dropping table 't1' removed the table directory from the WAL FS completely
|
||||||
|
assertFalse(TEST_UTIL.getDFSCluster().getFileSystem().exists(tableDir));
|
||||||
|
ServerName rs1 = cluster.getRegionServer(1).getServerName();
|
||||||
|
// Kill one RS and wait for the WAL split and replay be over.
|
||||||
|
cluster.killRegionServer(rs1);
|
||||||
|
cluster.waitForRegionServerToStop(rs1, 60 * 1000);
|
||||||
|
assertEquals(1, cluster.getNumLiveRegionServers());
|
||||||
|
Thread.sleep(1 * 1000);
|
||||||
|
TEST_UTIL.waitUntilNoRegionsInTransition(60 * 1000);
|
||||||
|
// Table 't1' is dropped. Assert table directory does not exist in WAL FS after WAL split.
|
||||||
|
assertFalse(TEST_UTIL.getDFSCluster().getFileSystem().exists(tableDir));
|
||||||
|
// Assert the table t2 region's data getting replayed after WAL split and available
|
||||||
|
for (byte[] rk : rks) {
|
||||||
|
Result result = tab2.get(new Get(rk));
|
||||||
|
assertFalse(result.isEmpty());
|
||||||
|
Cell cell = result.getColumnLatestCell(CFNAME, QNAME);
|
||||||
|
assertNotNull(cell);
|
||||||
|
assertTrue(CellUtil.matchingValue(cell, VALUE));
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
Loading…
Reference in New Issue