HBASE-17712 Remove/Simplify the logic of RegionScannerImpl.handleFileNotFound

This commit is contained in:
zhangduo 2017-03-06 21:00:50 +08:00
parent c42a0665b9
commit 58c76192bd
9 changed files with 356 additions and 122 deletions

View File

@ -202,6 +202,9 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi
public static final String HREGION_MVCC_PRE_ASSIGN = "hbase.hregion.mvcc.preassign";
public static final boolean DEFAULT_HREGION_MVCC_PRE_ASSIGN = true;
public static final String HREGION_UNASSIGN_FOR_FNFE = "hbase.hregion.unassign.for.fnfe";
public static final boolean DEFAULT_HREGION_UNASSIGN_FOR_FNFE = true;
/**
* This is the global default value for durability. All tables/mutations not
* defining a durability or using USE_DEFAULT will default to this value.
@ -238,9 +241,6 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi
protected volatile long lastReplayedOpenRegionSeqId = -1L;
protected volatile long lastReplayedCompactionSeqId = -1L;
// collects Map(s) of Store to sequence Id when handleFileNotFound() is involved
protected List<Map> storeSeqIds = new ArrayList<>();
//////////////////////////////////////////////////////////////////////////////
// Members
//////////////////////////////////////////////////////////////////////////////
@ -642,6 +642,8 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi
private final NavigableMap<byte[], Integer> replicationScope = new TreeMap<>(
Bytes.BYTES_COMPARATOR);
// whether to unassign region if we hit FNFE
private final RegionUnassigner regionUnassigner;
/**
* HRegion constructor. This constructor should only be used for testing and
* extensions. Instances of HRegion should be instantiated with the
@ -800,6 +802,13 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi
false :
conf.getBoolean(HConstants.ENABLE_CLIENT_BACKPRESSURE,
HConstants.DEFAULT_ENABLE_CLIENT_BACKPRESSURE);
boolean unassignForFNFE =
conf.getBoolean(HREGION_UNASSIGN_FOR_FNFE, DEFAULT_HREGION_UNASSIGN_FOR_FNFE);
if (unassignForFNFE) {
this.regionUnassigner = new RegionUnassigner(rsServices, fs.getRegionInfo());
} else {
this.regionUnassigner = null;
}
}
void setHTableSpecificConf() {
@ -1536,7 +1545,6 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi
status.setStatus("Running coprocessor pre-close hooks");
this.coprocessorHost.preClose(abort);
}
status.setStatus("Disabling compacts and flushes for region");
boolean canFlush = true;
synchronized (writestate) {
@ -1697,7 +1705,6 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi
}
// stop the Compacted hfile discharger
if (this.compactedFileDischarger != null) this.compactedFileDischarger.cancel(true);
status.markComplete("Closed");
LOG.info("Closed " + this);
return result;
@ -5070,15 +5077,11 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi
}
}
@edu.umd.cs.findbugs.annotations.SuppressWarnings(value = "NN_NAKED_NOTIFY",
justification = "Notify is about post replay. Intentional")
@Override
public boolean refreshStoreFiles() throws IOException {
return refreshStoreFiles(false);
}
@edu.umd.cs.findbugs.annotations.SuppressWarnings(value="NN_NAKED_NOTIFY",
justification="Notify is about post replay. Intentional")
protected boolean refreshStoreFiles(boolean force) throws IOException {
if (!force && ServerRegionReplicaUtil.isDefaultReplica(this.getRegionInfo())) {
if (ServerRegionReplicaUtil.isDefaultReplica(this.getRegionInfo())) {
return false; // if primary nothing to do
}
@ -5110,7 +5113,6 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi
// see whether we can drop the memstore or the snapshot
if (storeSeqId > maxSeqIdBefore) {
if (writestate.flushing) {
// only drop memstore snapshots if they are smaller than last flush for the store
if (this.prepareFlushResult.flushOpSeqId <= storeSeqId) {
@ -5150,17 +5152,10 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi
}
}
if (!map.isEmpty()) {
if (!force) {
for (Map.Entry<Store, Long> entry : map.entrySet()) {
// Drop the memstore contents if they are now smaller than the latest seen flushed file
totalFreedDataSize += dropMemstoreContentsForSeqId(entry.getValue(), entry.getKey())
.getDataSize();
}
} else {
synchronized (storeSeqIds) {
// don't try to acquire write lock of updatesLock now
storeSeqIds.add(map);
}
for (Map.Entry<Store, Long> entry : map.entrySet()) {
// Drop the memstore contents if they are now smaller than the latest seen flushed file
totalFreedDataSize += dropMemstoreContentsForSeqId(entry.getValue(), entry.getKey())
.getDataSize();
}
}
// C. Finally notify anyone waiting on memstore to clear:
@ -5844,12 +5839,7 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi
try {
for (Map.Entry<byte[], NavigableSet<byte[]>> entry : scan.getFamilyMap().entrySet()) {
Store store = stores.get(entry.getKey());
KeyValueScanner scanner;
try {
scanner = store.getScanner(scan, entry.getValue(), this.readPt);
} catch (FileNotFoundException e) {
throw handleFileNotFound(e);
}
KeyValueScanner scanner = store.getScanner(scan, entry.getValue(), this.readPt);
instantiatedScanners.add(scanner);
if (this.filter == null || !scan.doLoadColumnFamiliesOnDemand()
|| this.filter.isFamilyEssential(entry.getKey())) {
@ -5873,8 +5863,20 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi
}
}
private void handleFileNotFound(Throwable fnfe) {
// Try reopenning the region since we have lost some storefiles.
// See HBASE-17712 for more details.
LOG.warn("A store file got lost", fnfe);
if (regionUnassigner != null) {
regionUnassigner.unassign();
}
}
private IOException handleException(List<KeyValueScanner> instantiatedScanners,
Throwable t) {
if (t instanceof FileNotFoundException) {
handleFileNotFound(t);
}
// remove scaner read point before throw the exception
scannerReadPoints.remove(this);
if (storeHeap != null) {
@ -5957,14 +5959,19 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi
throw new UnknownScannerException("Scanner was closed");
}
boolean moreValues = false;
if (outResults.isEmpty()) {
// Usually outResults is empty. This is true when next is called
// to handle scan or get operation.
moreValues = nextInternal(outResults, scannerContext);
} else {
List<Cell> tmpList = new ArrayList<>();
moreValues = nextInternal(tmpList, scannerContext);
outResults.addAll(tmpList);
try {
if (outResults.isEmpty()) {
// Usually outResults is empty. This is true when next is called
// to handle scan or get operation.
moreValues = nextInternal(outResults, scannerContext);
} else {
List<Cell> tmpList = new ArrayList<Cell>();
moreValues = nextInternal(tmpList, scannerContext);
outResults.addAll(tmpList);
}
} catch (FileNotFoundException e) {
handleFileNotFound(e);
throw e;
}
// If the size limit was reached it means a partial Result is being returned. Returning a
@ -6014,33 +6021,29 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi
boolean tmpKeepProgress = scannerContext.getKeepProgress();
// Scanning between column families and thus the scope is between cells
LimitScope limitScope = LimitScope.BETWEEN_CELLS;
try {
do {
// We want to maintain any progress that is made towards the limits while scanning across
// different column families. To do this, we toggle the keep progress flag on during calls
// to the StoreScanner to ensure that any progress made thus far is not wiped away.
scannerContext.setKeepProgress(true);
heap.next(results, scannerContext);
scannerContext.setKeepProgress(tmpKeepProgress);
do {
// We want to maintain any progress that is made towards the limits while scanning across
// different column families. To do this, we toggle the keep progress flag on during calls
// to the StoreScanner to ensure that any progress made thus far is not wiped away.
scannerContext.setKeepProgress(true);
heap.next(results, scannerContext);
scannerContext.setKeepProgress(tmpKeepProgress);
nextKv = heap.peek();
moreCellsInRow = moreCellsInRow(nextKv, currentRowCell);
if (!moreCellsInRow) incrementCountOfRowsScannedMetric(scannerContext);
if (moreCellsInRow && scannerContext.checkBatchLimit(limitScope)) {
return scannerContext.setScannerState(NextState.BATCH_LIMIT_REACHED).hasMoreValues();
} else if (scannerContext.checkSizeLimit(limitScope)) {
ScannerContext.NextState state =
moreCellsInRow? NextState.SIZE_LIMIT_REACHED_MID_ROW: NextState.SIZE_LIMIT_REACHED;
return scannerContext.setScannerState(state).hasMoreValues();
} else if (scannerContext.checkTimeLimit(limitScope)) {
ScannerContext.NextState state =
moreCellsInRow? NextState.TIME_LIMIT_REACHED_MID_ROW: NextState.TIME_LIMIT_REACHED;
return scannerContext.setScannerState(state).hasMoreValues();
}
} while (moreCellsInRow);
} catch (FileNotFoundException e) {
throw handleFileNotFound(e);
}
nextKv = heap.peek();
moreCellsInRow = moreCellsInRow(nextKv, currentRowCell);
if (!moreCellsInRow) incrementCountOfRowsScannedMetric(scannerContext);
if (moreCellsInRow && scannerContext.checkBatchLimit(limitScope)) {
return scannerContext.setScannerState(NextState.BATCH_LIMIT_REACHED).hasMoreValues();
} else if (scannerContext.checkSizeLimit(limitScope)) {
ScannerContext.NextState state =
moreCellsInRow ? NextState.SIZE_LIMIT_REACHED_MID_ROW : NextState.SIZE_LIMIT_REACHED;
return scannerContext.setScannerState(state).hasMoreValues();
} else if (scannerContext.checkTimeLimit(limitScope)) {
ScannerContext.NextState state =
moreCellsInRow ? NextState.TIME_LIMIT_REACHED_MID_ROW : NextState.TIME_LIMIT_REACHED;
return scannerContext.setScannerState(state).hasMoreValues();
}
} while (moreCellsInRow);
return nextKv != null;
}
@ -6389,35 +6392,14 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi
result = this.joinedHeap.requestSeek(kv, true, true) || result;
}
} catch (FileNotFoundException e) {
throw handleFileNotFound(e);
handleFileNotFound(e);
throw e;
} finally {
closeRegionOperation();
}
return result;
}
private IOException handleFileNotFound(FileNotFoundException fnfe) throws IOException {
// tries to refresh the store files, otherwise shutdown the RS.
// TODO: add support for abort() of a single region and trigger reassignment.
try {
region.refreshStoreFiles(true);
return new IOException("unable to read store file");
} catch (IOException e) {
String msg = "a store file got lost: " + fnfe.getMessage();
LOG.error("unable to refresh store files", e);
abortRegionServer(msg);
return new NotServingRegionException(
getRegionInfo().getRegionNameAsString() + " is closing");
}
}
private void abortRegionServer(String msg) throws IOException {
if (rsServices instanceof HRegionServer) {
((HRegionServer)rsServices).abort(msg);
}
throw new UnsupportedOperationException("not able to abort RS after: " + msg);
}
@Override
public void shipped() throws IOException {
if (storeHeap != null) {
@ -7220,29 +7202,6 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi
return increment(increment, HConstants.NO_NONCE, HConstants.NO_NONCE);
}
// dropMemstoreContentsForSeqId() would acquire write lock of updatesLock
// We perform this operation outside of the read lock of updatesLock to avoid dead lock
// See HBASE-16304
@SuppressWarnings("unchecked")
private void dropMemstoreContents() throws IOException {
MemstoreSize totalFreedSize = new MemstoreSize();
while (!storeSeqIds.isEmpty()) {
Map<Store, Long> map = null;
synchronized (storeSeqIds) {
if (storeSeqIds.isEmpty()) break;
map = storeSeqIds.remove(storeSeqIds.size()-1);
}
for (Map.Entry<Store, Long> entry : map.entrySet()) {
// Drop the memstore contents if they are now smaller than the latest seen flushed file
totalFreedSize
.incMemstoreSize(dropMemstoreContentsForSeqId(entry.getValue(), entry.getKey()));
}
}
if (totalFreedSize.getDataSize() > 0) {
LOG.debug("Freed " + totalFreedSize.getDataSize() + " bytes from memstore");
}
}
@Override
public Result increment(Increment mutation, long nonceGroup, long nonce)
throws IOException {
@ -7306,10 +7265,6 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi
writeEntry = null;
} finally {
this.updatesLock.readLock().unlock();
// For increment/append, a region scanner for doing a get operation could throw
// FileNotFoundException. So we call dropMemstoreContents() in finally block
// after releasing read lock
dropMemstoreContents();
}
// If results is null, then client asked that we not return the calculated results.
return results != null && returnResults? Result.create(results): Result.EMPTY_RESULT;

View File

@ -3651,4 +3651,9 @@ public class HRegionServer extends HasThread implements
return new LockServiceClient(conf, lockStub, clusterConnection.getNonceGenerator())
.regionLock(regionInfos, description, abort);
}
@Override
public void unassign(byte[] regionName) throws IOException {
clusterConnection.getAdmin().unassign(regionName, false);
}
}

View File

@ -273,4 +273,14 @@ public interface RegionServerServices extends OnlineRegions, FavoredNodesForRegi
*/
EntityLock regionLock(List<HRegionInfo> regionInfos, String description,
Abortable abort) throws IOException;
/**
* Unassign the given region from the current regionserver and assign it randomly. Could still be
* assigned to us. This is used to solve some tough problems for which you need to reset the state
* of a region. For example, if you hit FileNotFound exception and want to refresh the store file
* list.
* <p>
* See HBASE-17712 for more details.
*/
void unassign(byte[] regionName) throws IOException;
}

View File

@ -0,0 +1,68 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hbase.regionserver;
import java.io.IOException;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.hbase.HRegionInfo;
import org.apache.hadoop.hbase.classification.InterfaceAudience;
/**
* Used to unssign a region when we hit FNFE.
*/
@InterfaceAudience.Private
class RegionUnassigner {
private static final Log LOG = LogFactory.getLog(RegionUnassigner.class);
private final RegionServerServices rsServices;
private final HRegionInfo regionInfo;
private boolean unassigning = false;
RegionUnassigner(RegionServerServices rsServices, HRegionInfo regionInfo) {
this.rsServices = rsServices;
this.regionInfo = regionInfo;
}
synchronized void unassign() {
if (unassigning) {
return;
}
unassigning = true;
new Thread("Unassign-" + regionInfo) {
@Override
public void run() {
LOG.info("Unassign " + regionInfo.getRegionNameAsString());
try {
rsServices.unassign(regionInfo.getRegionName());
} catch (IOException e) {
LOG.warn("Unassigned " + regionInfo.getRegionNameAsString() + " failed", e);
} finally {
synchronized (RegionUnassigner.this) {
unassigning = false;
}
}
}
}.start();
}
}

View File

@ -615,7 +615,15 @@ public class AsyncFSWAL extends AbstractFSWAL<AsyncWriter> {
break;
}
} else {
throw e.unwrapRemoteException();
IOException ioe = e.unwrapRemoteException();
// this usually means master already think we are dead so let's fail all the pending
// syncs. The shutdown process of RS will wait for all regions to be closed before calling
// WAL.close so if we do not wake up the thread blocked by sync here it will cause dead
// lock.
if (e.getMessage().contains("Parent directory doesn't exist:")) {
syncFutures.forEach(f -> f.done(f.getTxid(), ioe));
}
throw ioe;
}
} catch (NameNodeException e) {
throw e;
@ -696,6 +704,8 @@ public class AsyncFSWAL extends AbstractFSWAL<AsyncWriter> {
this.writer.close();
this.writer = null;
closeExecutor.shutdown();
IOException error = new IOException("WAL has been closed");
syncFutures.forEach(f -> f.done(f.getTxid(), error));
}
@Override

View File

@ -356,4 +356,8 @@ public class MockRegionServerServices implements RegionServerServices {
public SecureBulkLoadManager getSecureBulkLoadManager() {
return null;
}
@Override
public void unassign(byte[] regionName) throws IOException {
}
}

View File

@ -716,4 +716,8 @@ ClientProtos.ClientService.BlockingInterface, RegionServerServices {
public SecureBulkLoadManager getSecureBulkLoadManager() {
return null;
}
@Override
public void unassign(byte[] regionName) throws IOException {
}
}

View File

@ -0,0 +1,164 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hbase.regionserver;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.fail;
import java.io.IOException;
import java.util.Arrays;
import java.util.List;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.CoordinatedStateManager;
import org.apache.hadoop.hbase.HBaseTestingUtility;
import org.apache.hadoop.hbase.HConstants;
import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.Waiter.ExplainingPredicate;
import org.apache.hadoop.hbase.YouAreDeadException;
import org.apache.hadoop.hbase.client.Get;
import org.apache.hadoop.hbase.client.Put;
import org.apache.hadoop.hbase.client.Table;
import org.apache.hadoop.hbase.testclassification.LargeTests;
import org.apache.hadoop.hbase.testclassification.RegionServerTests;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.hbase.util.JVMClusterUtil.RegionServerThread;
import org.apache.hadoop.hbase.wal.AsyncFSWALProvider;
import org.apache.hadoop.hbase.wal.FSHLogProvider;
import org.apache.hadoop.hbase.wal.WALFactory;
import org.apache.hadoop.hbase.wal.WALProvider;
import org.apache.hadoop.hbase.zookeeper.ZKUtil;
import org.apache.hadoop.hbase.zookeeper.ZooKeeperWatcher;
import org.junit.After;
import org.junit.Before;
import org.junit.Test;
import org.junit.experimental.categories.Category;
import org.junit.runner.RunWith;
import org.junit.runners.Parameterized;
import org.junit.runners.Parameterized.Parameter;
import org.junit.runners.Parameterized.Parameters;
/**
* This testcase is used to ensure that the compaction marker will fail a compaction if the RS is
* already dead. It can not eliminate FNFE when scanning but it does reduce the possibility a lot.
*/
@RunWith(Parameterized.class)
@Category({ RegionServerTests.class, LargeTests.class })
public class TestCompactionInDeadRegionServer {
private static final HBaseTestingUtility UTIL = new HBaseTestingUtility();
private static final TableName TABLE_NAME = TableName.valueOf("test");
private static final byte[] CF = Bytes.toBytes("cf");
private static final byte[] CQ = Bytes.toBytes("cq");
public static final class IgnoreYouAreDeadRS extends HRegionServer {
public IgnoreYouAreDeadRS(Configuration conf) throws IOException, InterruptedException {
super(conf);
}
public IgnoreYouAreDeadRS(Configuration conf, CoordinatedStateManager csm) throws IOException {
super(conf, csm);
}
@Override
protected void tryRegionServerReport(long reportStartTime, long reportEndTime)
throws IOException {
try {
super.tryRegionServerReport(reportStartTime, reportEndTime);
} catch (YouAreDeadException e) {
// ignore, do not abort
}
}
}
@Parameter
public Class<? extends WALProvider> walProvider;
@Parameters(name = "{index}: wal={0}")
public static List<Object[]> params() {
return Arrays.asList(new Object[] { FSHLogProvider.class },
new Object[] { AsyncFSWALProvider.class });
}
@Before
public void setUp() throws Exception {
UTIL.getConfiguration().setClass(WALFactory.WAL_PROVIDER, walProvider, WALProvider.class);
UTIL.getConfiguration().setInt(HConstants.ZK_SESSION_TIMEOUT, 2000);
UTIL.getConfiguration().setClass(HConstants.REGION_SERVER_IMPL, IgnoreYouAreDeadRS.class,
HRegionServer.class);
UTIL.startMiniCluster(2);
Table table = UTIL.createTable(TABLE_NAME, CF);
for (int i = 0; i < 10; i++) {
table.put(new Put(Bytes.toBytes(i)).addColumn(CF, CQ, Bytes.toBytes(i)));
}
UTIL.getAdmin().flush(TABLE_NAME);
for (int i = 10; i < 20; i++) {
table.put(new Put(Bytes.toBytes(i)).addColumn(CF, CQ, Bytes.toBytes(i)));
}
UTIL.getAdmin().flush(TABLE_NAME);
}
@After
public void tearDown() throws Exception {
UTIL.shutdownMiniCluster();
}
@Test
public void test() throws Exception {
HRegionServer rsToSuspend = UTIL.getRSForFirstRegionInTable(TABLE_NAME);
HRegion region = (HRegion) rsToSuspend.getOnlineRegions(TABLE_NAME).get(0);
ZooKeeperWatcher watcher = UTIL.getZooKeeperWatcher();
watcher.getRecoverableZooKeeper().delete(
ZKUtil.joinZNode(watcher.getZNodePaths().rsZNode, rsToSuspend.getServerName().toString()),
-1);
UTIL.waitFor(60000, 1000, new ExplainingPredicate<Exception>() {
@Override
public boolean evaluate() throws Exception {
for (RegionServerThread thread : UTIL.getHBaseCluster().getRegionServerThreads()) {
HRegionServer rs = thread.getRegionServer();
if (rs != rsToSuspend) {
return !rs.getOnlineRegions(TABLE_NAME).isEmpty();
}
}
return false;
}
@Override
public String explainFailure() throws Exception {
return "The region for " + TABLE_NAME + " is still on " + rsToSuspend.getServerName();
}
});
try {
region.compact(true);
fail("Should fail as our wal file has already been closed, " +
"and walDir has also been renamed");
} catch (Exception e) {
// expected
}
Table table = UTIL.getConnection().getTable(TABLE_NAME);
// should not hit FNFE
for (int i = 0; i < 20; i++) {
assertEquals(i, Bytes.toInt(table.get(new Get(Bytes.toBytes(i))).getValue(CF, CQ)));
}
}
}

View File

@ -18,12 +18,14 @@
package org.apache.hadoop.hbase.regionserver;
import java.io.FileNotFoundException;
import java.io.IOException;
import java.util.ArrayList;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FSDataInputStream;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hbase.HBaseTestingUtility;
@ -133,6 +135,23 @@ public class TestCorruptedRegionStoreFile {
}
}
private void removeStoreFile(FileSystem fs, Path tmpStoreFilePath) throws Exception {
try (FSDataInputStream input = fs.open(storeFiles.get(0))) {
fs.copyToLocalFile(true, storeFiles.get(0), tmpStoreFilePath);
LOG.info("Move file to local");
evictHFileCache(storeFiles.get(0));
// make sure that all the replicas have been deleted on DNs.
for (;;) {
try {
input.read(0, new byte[1], 0, 1);
} catch (FileNotFoundException e) {
break;
}
Thread.sleep(1000);
}
}
}
@Test(timeout=180000)
public void testLosingFileDuringScan() throws Exception {
assertEquals(rowCount, fullScanAndCount(TEST_TABLE.getTableName()));
@ -148,9 +167,7 @@ public class TestCorruptedRegionStoreFile {
public void beforeScanNext(Table table) throws Exception {
// move the path away (now the region is corrupted)
if (hasFile) {
fs.copyToLocalFile(true, storeFiles.get(0), tmpStoreFilePath);
LOG.info("Move file to local");
evictHFileCache(storeFiles.get(0));
removeStoreFile(fs, tmpStoreFilePath);
hasFile = false;
}
}
@ -174,9 +191,7 @@ public class TestCorruptedRegionStoreFile {
public void beforeScan(Table table, Scan scan) throws Exception {
// move the path away (now the region is corrupted)
if (hasFile) {
fs.copyToLocalFile(true, storeFiles.get(0), tmpStoreFilePath);
LOG.info("Move file to local");
evictHFileCache(storeFiles.get(0));
removeStoreFile(fs, tmpStoreFilePath);
hasFile = false;
}
}
@ -201,7 +216,6 @@ public class TestCorruptedRegionStoreFile {
HRegionServer rs = rst.getRegionServer();
rs.getCacheConfig().getBlockCache().evictBlocksByHfileName(hfile.getName());
}
Thread.sleep(6000);
}
private int fullScanAndCount(final TableName tableName) throws Exception {