diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java index be012200c88..29773c3da8a 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java @@ -202,6 +202,9 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi public static final String HREGION_MVCC_PRE_ASSIGN = "hbase.hregion.mvcc.preassign"; public static final boolean DEFAULT_HREGION_MVCC_PRE_ASSIGN = true; + public static final String HREGION_UNASSIGN_FOR_FNFE = "hbase.hregion.unassign.for.fnfe"; + public static final boolean DEFAULT_HREGION_UNASSIGN_FOR_FNFE = true; + /** * This is the global default value for durability. All tables/mutations not * defining a durability or using USE_DEFAULT will default to this value. @@ -238,9 +241,6 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi protected volatile long lastReplayedOpenRegionSeqId = -1L; protected volatile long lastReplayedCompactionSeqId = -1L; - // collects Map(s) of Store to sequence Id when handleFileNotFound() is involved - protected List storeSeqIds = new ArrayList<>(); - ////////////////////////////////////////////////////////////////////////////// // Members ////////////////////////////////////////////////////////////////////////////// @@ -642,6 +642,8 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi private final NavigableMap replicationScope = new TreeMap<>( Bytes.BYTES_COMPARATOR); + // whether to unassign region if we hit FNFE + private final RegionUnassigner regionUnassigner; /** * HRegion constructor. This constructor should only be used for testing and * extensions. Instances of HRegion should be instantiated with the @@ -800,6 +802,13 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi false : conf.getBoolean(HConstants.ENABLE_CLIENT_BACKPRESSURE, HConstants.DEFAULT_ENABLE_CLIENT_BACKPRESSURE); + boolean unassignForFNFE = + conf.getBoolean(HREGION_UNASSIGN_FOR_FNFE, DEFAULT_HREGION_UNASSIGN_FOR_FNFE); + if (unassignForFNFE) { + this.regionUnassigner = new RegionUnassigner(rsServices, fs.getRegionInfo()); + } else { + this.regionUnassigner = null; + } } void setHTableSpecificConf() { @@ -1536,7 +1545,6 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi status.setStatus("Running coprocessor pre-close hooks"); this.coprocessorHost.preClose(abort); } - status.setStatus("Disabling compacts and flushes for region"); boolean canFlush = true; synchronized (writestate) { @@ -1697,7 +1705,6 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi } // stop the Compacted hfile discharger if (this.compactedFileDischarger != null) this.compactedFileDischarger.cancel(true); - status.markComplete("Closed"); LOG.info("Closed " + this); return result; @@ -5070,15 +5077,11 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi } } + @edu.umd.cs.findbugs.annotations.SuppressWarnings(value = "NN_NAKED_NOTIFY", + justification = "Notify is about post replay. Intentional") @Override public boolean refreshStoreFiles() throws IOException { - return refreshStoreFiles(false); - } - - @edu.umd.cs.findbugs.annotations.SuppressWarnings(value="NN_NAKED_NOTIFY", - justification="Notify is about post replay. Intentional") - protected boolean refreshStoreFiles(boolean force) throws IOException { - if (!force && ServerRegionReplicaUtil.isDefaultReplica(this.getRegionInfo())) { + if (ServerRegionReplicaUtil.isDefaultReplica(this.getRegionInfo())) { return false; // if primary nothing to do } @@ -5110,7 +5113,6 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi // see whether we can drop the memstore or the snapshot if (storeSeqId > maxSeqIdBefore) { - if (writestate.flushing) { // only drop memstore snapshots if they are smaller than last flush for the store if (this.prepareFlushResult.flushOpSeqId <= storeSeqId) { @@ -5150,17 +5152,10 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi } } if (!map.isEmpty()) { - if (!force) { - for (Map.Entry entry : map.entrySet()) { - // Drop the memstore contents if they are now smaller than the latest seen flushed file - totalFreedDataSize += dropMemstoreContentsForSeqId(entry.getValue(), entry.getKey()) - .getDataSize(); - } - } else { - synchronized (storeSeqIds) { - // don't try to acquire write lock of updatesLock now - storeSeqIds.add(map); - } + for (Map.Entry entry : map.entrySet()) { + // Drop the memstore contents if they are now smaller than the latest seen flushed file + totalFreedDataSize += dropMemstoreContentsForSeqId(entry.getValue(), entry.getKey()) + .getDataSize(); } } // C. Finally notify anyone waiting on memstore to clear: @@ -5844,12 +5839,7 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi try { for (Map.Entry> entry : scan.getFamilyMap().entrySet()) { Store store = stores.get(entry.getKey()); - KeyValueScanner scanner; - try { - scanner = store.getScanner(scan, entry.getValue(), this.readPt); - } catch (FileNotFoundException e) { - throw handleFileNotFound(e); - } + KeyValueScanner scanner = store.getScanner(scan, entry.getValue(), this.readPt); instantiatedScanners.add(scanner); if (this.filter == null || !scan.doLoadColumnFamiliesOnDemand() || this.filter.isFamilyEssential(entry.getKey())) { @@ -5873,8 +5863,20 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi } } + private void handleFileNotFound(Throwable fnfe) { + // Try reopenning the region since we have lost some storefiles. + // See HBASE-17712 for more details. + LOG.warn("A store file got lost", fnfe); + if (regionUnassigner != null) { + regionUnassigner.unassign(); + } + } + private IOException handleException(List instantiatedScanners, Throwable t) { + if (t instanceof FileNotFoundException) { + handleFileNotFound(t); + } // remove scaner read point before throw the exception scannerReadPoints.remove(this); if (storeHeap != null) { @@ -5957,14 +5959,19 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi throw new UnknownScannerException("Scanner was closed"); } boolean moreValues = false; - if (outResults.isEmpty()) { - // Usually outResults is empty. This is true when next is called - // to handle scan or get operation. - moreValues = nextInternal(outResults, scannerContext); - } else { - List tmpList = new ArrayList<>(); - moreValues = nextInternal(tmpList, scannerContext); - outResults.addAll(tmpList); + try { + if (outResults.isEmpty()) { + // Usually outResults is empty. This is true when next is called + // to handle scan or get operation. + moreValues = nextInternal(outResults, scannerContext); + } else { + List tmpList = new ArrayList(); + moreValues = nextInternal(tmpList, scannerContext); + outResults.addAll(tmpList); + } + } catch (FileNotFoundException e) { + handleFileNotFound(e); + throw e; } // If the size limit was reached it means a partial Result is being returned. Returning a @@ -6014,33 +6021,29 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi boolean tmpKeepProgress = scannerContext.getKeepProgress(); // Scanning between column families and thus the scope is between cells LimitScope limitScope = LimitScope.BETWEEN_CELLS; - try { - do { - // We want to maintain any progress that is made towards the limits while scanning across - // different column families. To do this, we toggle the keep progress flag on during calls - // to the StoreScanner to ensure that any progress made thus far is not wiped away. - scannerContext.setKeepProgress(true); - heap.next(results, scannerContext); - scannerContext.setKeepProgress(tmpKeepProgress); + do { + // We want to maintain any progress that is made towards the limits while scanning across + // different column families. To do this, we toggle the keep progress flag on during calls + // to the StoreScanner to ensure that any progress made thus far is not wiped away. + scannerContext.setKeepProgress(true); + heap.next(results, scannerContext); + scannerContext.setKeepProgress(tmpKeepProgress); - nextKv = heap.peek(); - moreCellsInRow = moreCellsInRow(nextKv, currentRowCell); - if (!moreCellsInRow) incrementCountOfRowsScannedMetric(scannerContext); - if (moreCellsInRow && scannerContext.checkBatchLimit(limitScope)) { - return scannerContext.setScannerState(NextState.BATCH_LIMIT_REACHED).hasMoreValues(); - } else if (scannerContext.checkSizeLimit(limitScope)) { - ScannerContext.NextState state = - moreCellsInRow? NextState.SIZE_LIMIT_REACHED_MID_ROW: NextState.SIZE_LIMIT_REACHED; - return scannerContext.setScannerState(state).hasMoreValues(); - } else if (scannerContext.checkTimeLimit(limitScope)) { - ScannerContext.NextState state = - moreCellsInRow? NextState.TIME_LIMIT_REACHED_MID_ROW: NextState.TIME_LIMIT_REACHED; - return scannerContext.setScannerState(state).hasMoreValues(); - } - } while (moreCellsInRow); - } catch (FileNotFoundException e) { - throw handleFileNotFound(e); - } + nextKv = heap.peek(); + moreCellsInRow = moreCellsInRow(nextKv, currentRowCell); + if (!moreCellsInRow) incrementCountOfRowsScannedMetric(scannerContext); + if (moreCellsInRow && scannerContext.checkBatchLimit(limitScope)) { + return scannerContext.setScannerState(NextState.BATCH_LIMIT_REACHED).hasMoreValues(); + } else if (scannerContext.checkSizeLimit(limitScope)) { + ScannerContext.NextState state = + moreCellsInRow ? NextState.SIZE_LIMIT_REACHED_MID_ROW : NextState.SIZE_LIMIT_REACHED; + return scannerContext.setScannerState(state).hasMoreValues(); + } else if (scannerContext.checkTimeLimit(limitScope)) { + ScannerContext.NextState state = + moreCellsInRow ? NextState.TIME_LIMIT_REACHED_MID_ROW : NextState.TIME_LIMIT_REACHED; + return scannerContext.setScannerState(state).hasMoreValues(); + } + } while (moreCellsInRow); return nextKv != null; } @@ -6389,35 +6392,14 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi result = this.joinedHeap.requestSeek(kv, true, true) || result; } } catch (FileNotFoundException e) { - throw handleFileNotFound(e); + handleFileNotFound(e); + throw e; } finally { closeRegionOperation(); } return result; } - private IOException handleFileNotFound(FileNotFoundException fnfe) throws IOException { - // tries to refresh the store files, otherwise shutdown the RS. - // TODO: add support for abort() of a single region and trigger reassignment. - try { - region.refreshStoreFiles(true); - return new IOException("unable to read store file"); - } catch (IOException e) { - String msg = "a store file got lost: " + fnfe.getMessage(); - LOG.error("unable to refresh store files", e); - abortRegionServer(msg); - return new NotServingRegionException( - getRegionInfo().getRegionNameAsString() + " is closing"); - } - } - - private void abortRegionServer(String msg) throws IOException { - if (rsServices instanceof HRegionServer) { - ((HRegionServer)rsServices).abort(msg); - } - throw new UnsupportedOperationException("not able to abort RS after: " + msg); - } - @Override public void shipped() throws IOException { if (storeHeap != null) { @@ -7220,29 +7202,6 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi return increment(increment, HConstants.NO_NONCE, HConstants.NO_NONCE); } - // dropMemstoreContentsForSeqId() would acquire write lock of updatesLock - // We perform this operation outside of the read lock of updatesLock to avoid dead lock - // See HBASE-16304 - @SuppressWarnings("unchecked") - private void dropMemstoreContents() throws IOException { - MemstoreSize totalFreedSize = new MemstoreSize(); - while (!storeSeqIds.isEmpty()) { - Map map = null; - synchronized (storeSeqIds) { - if (storeSeqIds.isEmpty()) break; - map = storeSeqIds.remove(storeSeqIds.size()-1); - } - for (Map.Entry entry : map.entrySet()) { - // Drop the memstore contents if they are now smaller than the latest seen flushed file - totalFreedSize - .incMemstoreSize(dropMemstoreContentsForSeqId(entry.getValue(), entry.getKey())); - } - } - if (totalFreedSize.getDataSize() > 0) { - LOG.debug("Freed " + totalFreedSize.getDataSize() + " bytes from memstore"); - } - } - @Override public Result increment(Increment mutation, long nonceGroup, long nonce) throws IOException { @@ -7306,10 +7265,6 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi writeEntry = null; } finally { this.updatesLock.readLock().unlock(); - // For increment/append, a region scanner for doing a get operation could throw - // FileNotFoundException. So we call dropMemstoreContents() in finally block - // after releasing read lock - dropMemstoreContents(); } // If results is null, then client asked that we not return the calculated results. return results != null && returnResults? Result.create(results): Result.EMPTY_RESULT; diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java index cbf6561b838..be4cca05988 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java @@ -3651,4 +3651,9 @@ public class HRegionServer extends HasThread implements return new LockServiceClient(conf, lockStub, clusterConnection.getNonceGenerator()) .regionLock(regionInfos, description, abort); } + + @Override + public void unassign(byte[] regionName) throws IOException { + clusterConnection.getAdmin().unassign(regionName, false); + } } diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RegionServerServices.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RegionServerServices.java index c92124cd1a7..33822634187 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RegionServerServices.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RegionServerServices.java @@ -273,4 +273,14 @@ public interface RegionServerServices extends OnlineRegions, FavoredNodesForRegi */ EntityLock regionLock(List regionInfos, String description, Abortable abort) throws IOException; + + /** + * Unassign the given region from the current regionserver and assign it randomly. Could still be + * assigned to us. This is used to solve some tough problems for which you need to reset the state + * of a region. For example, if you hit FileNotFound exception and want to refresh the store file + * list. + *

+ * See HBASE-17712 for more details. + */ + void unassign(byte[] regionName) throws IOException; } diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RegionUnassigner.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RegionUnassigner.java new file mode 100644 index 00000000000..b347b4b5681 --- /dev/null +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RegionUnassigner.java @@ -0,0 +1,68 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.regionserver; + +import java.io.IOException; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.hbase.HRegionInfo; +import org.apache.hadoop.hbase.classification.InterfaceAudience; + +/** + * Used to unssign a region when we hit FNFE. + */ +@InterfaceAudience.Private +class RegionUnassigner { + + private static final Log LOG = LogFactory.getLog(RegionUnassigner.class); + + private final RegionServerServices rsServices; + + private final HRegionInfo regionInfo; + + private boolean unassigning = false; + + RegionUnassigner(RegionServerServices rsServices, HRegionInfo regionInfo) { + this.rsServices = rsServices; + this.regionInfo = regionInfo; + } + + synchronized void unassign() { + if (unassigning) { + return; + } + unassigning = true; + new Thread("Unassign-" + regionInfo) { + + @Override + public void run() { + LOG.info("Unassign " + regionInfo.getRegionNameAsString()); + try { + rsServices.unassign(regionInfo.getRegionName()); + } catch (IOException e) { + LOG.warn("Unassigned " + regionInfo.getRegionNameAsString() + " failed", e); + } finally { + synchronized (RegionUnassigner.this) { + unassigning = false; + } + } + } + }.start(); + } +} diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/AsyncFSWAL.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/AsyncFSWAL.java index c3e96cfa497..69ca1c523cb 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/AsyncFSWAL.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/AsyncFSWAL.java @@ -615,7 +615,15 @@ public class AsyncFSWAL extends AbstractFSWAL { break; } } else { - throw e.unwrapRemoteException(); + IOException ioe = e.unwrapRemoteException(); + // this usually means master already think we are dead so let's fail all the pending + // syncs. The shutdown process of RS will wait for all regions to be closed before calling + // WAL.close so if we do not wake up the thread blocked by sync here it will cause dead + // lock. + if (e.getMessage().contains("Parent directory doesn't exist:")) { + syncFutures.forEach(f -> f.done(f.getTxid(), ioe)); + } + throw ioe; } } catch (NameNodeException e) { throw e; @@ -696,6 +704,8 @@ public class AsyncFSWAL extends AbstractFSWAL { this.writer.close(); this.writer = null; closeExecutor.shutdown(); + IOException error = new IOException("WAL has been closed"); + syncFutures.forEach(f -> f.done(f.getTxid(), error)); } @Override diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/MockRegionServerServices.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/MockRegionServerServices.java index 7740e66ae19..81b34891444 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/MockRegionServerServices.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/MockRegionServerServices.java @@ -356,4 +356,8 @@ public class MockRegionServerServices implements RegionServerServices { public SecureBulkLoadManager getSecureBulkLoadManager() { return null; } + + @Override + public void unassign(byte[] regionName) throws IOException { + } } diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/master/MockRegionServer.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/master/MockRegionServer.java index a5fe9528547..b653e3fd070 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/master/MockRegionServer.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/master/MockRegionServer.java @@ -716,4 +716,8 @@ ClientProtos.ClientService.BlockingInterface, RegionServerServices { public SecureBulkLoadManager getSecureBulkLoadManager() { return null; } + + @Override + public void unassign(byte[] regionName) throws IOException { + } } diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestCompactionInDeadRegionServer.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestCompactionInDeadRegionServer.java new file mode 100644 index 00000000000..ae81a4b4ee9 --- /dev/null +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestCompactionInDeadRegionServer.java @@ -0,0 +1,164 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.regionserver; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.fail; + +import java.io.IOException; +import java.util.Arrays; +import java.util.List; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hbase.CoordinatedStateManager; +import org.apache.hadoop.hbase.HBaseTestingUtility; +import org.apache.hadoop.hbase.HConstants; +import org.apache.hadoop.hbase.TableName; +import org.apache.hadoop.hbase.Waiter.ExplainingPredicate; +import org.apache.hadoop.hbase.YouAreDeadException; +import org.apache.hadoop.hbase.client.Get; +import org.apache.hadoop.hbase.client.Put; +import org.apache.hadoop.hbase.client.Table; +import org.apache.hadoop.hbase.testclassification.LargeTests; +import org.apache.hadoop.hbase.testclassification.RegionServerTests; +import org.apache.hadoop.hbase.util.Bytes; +import org.apache.hadoop.hbase.util.JVMClusterUtil.RegionServerThread; +import org.apache.hadoop.hbase.wal.AsyncFSWALProvider; +import org.apache.hadoop.hbase.wal.FSHLogProvider; +import org.apache.hadoop.hbase.wal.WALFactory; +import org.apache.hadoop.hbase.wal.WALProvider; +import org.apache.hadoop.hbase.zookeeper.ZKUtil; +import org.apache.hadoop.hbase.zookeeper.ZooKeeperWatcher; +import org.junit.After; +import org.junit.Before; +import org.junit.Test; +import org.junit.experimental.categories.Category; +import org.junit.runner.RunWith; +import org.junit.runners.Parameterized; +import org.junit.runners.Parameterized.Parameter; +import org.junit.runners.Parameterized.Parameters; + +/** + * This testcase is used to ensure that the compaction marker will fail a compaction if the RS is + * already dead. It can not eliminate FNFE when scanning but it does reduce the possibility a lot. + */ +@RunWith(Parameterized.class) +@Category({ RegionServerTests.class, LargeTests.class }) +public class TestCompactionInDeadRegionServer { + + private static final HBaseTestingUtility UTIL = new HBaseTestingUtility(); + + private static final TableName TABLE_NAME = TableName.valueOf("test"); + + private static final byte[] CF = Bytes.toBytes("cf"); + + private static final byte[] CQ = Bytes.toBytes("cq"); + + public static final class IgnoreYouAreDeadRS extends HRegionServer { + + public IgnoreYouAreDeadRS(Configuration conf) throws IOException, InterruptedException { + super(conf); + } + + public IgnoreYouAreDeadRS(Configuration conf, CoordinatedStateManager csm) throws IOException { + super(conf, csm); + } + + @Override + protected void tryRegionServerReport(long reportStartTime, long reportEndTime) + throws IOException { + try { + super.tryRegionServerReport(reportStartTime, reportEndTime); + } catch (YouAreDeadException e) { + // ignore, do not abort + } + } + } + + @Parameter + public Class walProvider; + + @Parameters(name = "{index}: wal={0}") + public static List params() { + return Arrays.asList(new Object[] { FSHLogProvider.class }, + new Object[] { AsyncFSWALProvider.class }); + } + + @Before + public void setUp() throws Exception { + UTIL.getConfiguration().setClass(WALFactory.WAL_PROVIDER, walProvider, WALProvider.class); + UTIL.getConfiguration().setInt(HConstants.ZK_SESSION_TIMEOUT, 2000); + UTIL.getConfiguration().setClass(HConstants.REGION_SERVER_IMPL, IgnoreYouAreDeadRS.class, + HRegionServer.class); + UTIL.startMiniCluster(2); + Table table = UTIL.createTable(TABLE_NAME, CF); + for (int i = 0; i < 10; i++) { + table.put(new Put(Bytes.toBytes(i)).addColumn(CF, CQ, Bytes.toBytes(i))); + } + UTIL.getAdmin().flush(TABLE_NAME); + for (int i = 10; i < 20; i++) { + table.put(new Put(Bytes.toBytes(i)).addColumn(CF, CQ, Bytes.toBytes(i))); + } + UTIL.getAdmin().flush(TABLE_NAME); + } + + @After + public void tearDown() throws Exception { + UTIL.shutdownMiniCluster(); + } + + @Test + public void test() throws Exception { + HRegionServer rsToSuspend = UTIL.getRSForFirstRegionInTable(TABLE_NAME); + HRegion region = (HRegion) rsToSuspend.getOnlineRegions(TABLE_NAME).get(0); + ZooKeeperWatcher watcher = UTIL.getZooKeeperWatcher(); + watcher.getRecoverableZooKeeper().delete( + ZKUtil.joinZNode(watcher.getZNodePaths().rsZNode, rsToSuspend.getServerName().toString()), + -1); + UTIL.waitFor(60000, 1000, new ExplainingPredicate() { + + @Override + public boolean evaluate() throws Exception { + for (RegionServerThread thread : UTIL.getHBaseCluster().getRegionServerThreads()) { + HRegionServer rs = thread.getRegionServer(); + if (rs != rsToSuspend) { + return !rs.getOnlineRegions(TABLE_NAME).isEmpty(); + } + } + return false; + } + + @Override + public String explainFailure() throws Exception { + return "The region for " + TABLE_NAME + " is still on " + rsToSuspend.getServerName(); + } + }); + try { + region.compact(true); + fail("Should fail as our wal file has already been closed, " + + "and walDir has also been renamed"); + } catch (Exception e) { + // expected + } + Table table = UTIL.getConnection().getTable(TABLE_NAME); + // should not hit FNFE + for (int i = 0; i < 20; i++) { + assertEquals(i, Bytes.toInt(table.get(new Get(Bytes.toBytes(i))).getValue(CF, CQ))); + } + } +} diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestCorruptedRegionStoreFile.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestCorruptedRegionStoreFile.java index cec5fc710fb..42648630eb9 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestCorruptedRegionStoreFile.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestCorruptedRegionStoreFile.java @@ -18,12 +18,14 @@ package org.apache.hadoop.hbase.regionserver; +import java.io.FileNotFoundException; import java.io.IOException; import java.util.ArrayList; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FSDataInputStream; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import org.apache.hadoop.hbase.HBaseTestingUtility; @@ -133,6 +135,23 @@ public class TestCorruptedRegionStoreFile { } } + private void removeStoreFile(FileSystem fs, Path tmpStoreFilePath) throws Exception { + try (FSDataInputStream input = fs.open(storeFiles.get(0))) { + fs.copyToLocalFile(true, storeFiles.get(0), tmpStoreFilePath); + LOG.info("Move file to local"); + evictHFileCache(storeFiles.get(0)); + // make sure that all the replicas have been deleted on DNs. + for (;;) { + try { + input.read(0, new byte[1], 0, 1); + } catch (FileNotFoundException e) { + break; + } + Thread.sleep(1000); + } + } + } + @Test(timeout=180000) public void testLosingFileDuringScan() throws Exception { assertEquals(rowCount, fullScanAndCount(TEST_TABLE.getTableName())); @@ -148,9 +167,7 @@ public class TestCorruptedRegionStoreFile { public void beforeScanNext(Table table) throws Exception { // move the path away (now the region is corrupted) if (hasFile) { - fs.copyToLocalFile(true, storeFiles.get(0), tmpStoreFilePath); - LOG.info("Move file to local"); - evictHFileCache(storeFiles.get(0)); + removeStoreFile(fs, tmpStoreFilePath); hasFile = false; } } @@ -174,9 +191,7 @@ public class TestCorruptedRegionStoreFile { public void beforeScan(Table table, Scan scan) throws Exception { // move the path away (now the region is corrupted) if (hasFile) { - fs.copyToLocalFile(true, storeFiles.get(0), tmpStoreFilePath); - LOG.info("Move file to local"); - evictHFileCache(storeFiles.get(0)); + removeStoreFile(fs, tmpStoreFilePath); hasFile = false; } } @@ -201,7 +216,6 @@ public class TestCorruptedRegionStoreFile { HRegionServer rs = rst.getRegionServer(); rs.getCacheConfig().getBlockCache().evictBlocksByHfileName(hfile.getName()); } - Thread.sleep(6000); } private int fullScanAndCount(final TableName tableName) throws Exception {