diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java index 5c117f06c80..39940279527 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java @@ -1103,7 +1103,7 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi public long getNumMutationsWithoutWAL() { return numMutationsWithoutWAL.get(); } - + @Override public long getDataInMemoryWithoutWAL() { return dataInMemoryWithoutWAL.get(); @@ -2360,7 +2360,7 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi LOG.info(msg); status.setStatus(msg); - return new FlushResultImpl(compactionRequested ? + return new FlushResultImpl(compactionRequested ? FlushResult.Result.FLUSHED_COMPACTION_NEEDED : FlushResult.Result.FLUSHED_NO_COMPACTION_NEEDED, flushOpSeqId); @@ -4713,7 +4713,11 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi @Override public boolean refreshStoreFiles() throws IOException { - if (ServerRegionReplicaUtil.isDefaultReplica(this.getRegionInfo())) { + return refreshStoreFiles(false); + } + + protected boolean refreshStoreFiles(boolean force) throws IOException { + if (!force && ServerRegionReplicaUtil.isDefaultReplica(this.getRegionInfo())) { return false; // if primary nothing to do } @@ -5192,15 +5196,17 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi * If the joined heap data gathering is interrupted due to scan limits, this will * contain the row for which we are populating the values.*/ protected Cell joinedContinuationRow = null; - protected final byte[] stopRow; - private final FilterWrapper filter; - private ScannerContext defaultScannerContext; - protected int isScan; private boolean filterClosed = false; - private long readPt; - private long maxResultSize; - protected HRegion region; - protected CellComparator comparator; + + protected final int isScan; + protected final byte[] stopRow; + protected final HRegion region; + protected final CellComparator comparator; + + private final long readPt; + private final long maxResultSize; + private final ScannerContext defaultScannerContext; + private final FilterWrapper filter; @Override public HRegionInfo getRegionInfo() { @@ -5209,7 +5215,6 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi RegionScannerImpl(Scan scan, List additionalScanners, HRegion region) throws IOException { - this.region = region; this.maxResultSize = scan.getMaxResultSize(); if (scan.hasFilter()) { @@ -5251,10 +5256,14 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi scanners.addAll(additionalScanners); } - for (Map.Entry> entry : - scan.getFamilyMap().entrySet()) { + for (Map.Entry> entry : scan.getFamilyMap().entrySet()) { Store store = stores.get(entry.getKey()); - KeyValueScanner scanner = store.getScanner(scan, entry.getValue(), this.readPt); + KeyValueScanner scanner; + try { + scanner = store.getScanner(scan, entry.getValue(), this.readPt); + } catch (FileNotFoundException e) { + throw handleFileNotFound(e); + } if (this.filter == null || !scan.doLoadColumnFamiliesOnDemand() || this.filter.isFamilyEssential(entry.getKey())) { scanners.add(scanner); @@ -5346,7 +5355,7 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi moreValues = nextInternal(tmpList, scannerContext); outResults.addAll(tmpList); } - + // If the size limit was reached it means a partial Result is being returned. Returning a // partial Result means that we should not reset the filters; filters should only be reset in // between rows @@ -5397,30 +5406,33 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi boolean tmpKeepProgress = scannerContext.getKeepProgress(); // Scanning between column families and thus the scope is between cells LimitScope limitScope = LimitScope.BETWEEN_CELLS; - do { - // We want to maintain any progress that is made towards the limits while scanning across - // different column families. To do this, we toggle the keep progress flag on during calls - // to the StoreScanner to ensure that any progress made thus far is not wiped away. - scannerContext.setKeepProgress(true); - heap.next(results, scannerContext); - scannerContext.setKeepProgress(tmpKeepProgress); + try { + do { + // We want to maintain any progress that is made towards the limits while scanning across + // different column families. To do this, we toggle the keep progress flag on during calls + // to the StoreScanner to ensure that any progress made thus far is not wiped away. + scannerContext.setKeepProgress(true); + heap.next(results, scannerContext); + scannerContext.setKeepProgress(tmpKeepProgress); - nextKv = heap.peek(); - moreCellsInRow = moreCellsInRow(nextKv, currentRow, offset, length); + nextKv = heap.peek(); + moreCellsInRow = moreCellsInRow(nextKv, currentRow, offset, length); - if (scannerContext.checkBatchLimit(limitScope)) { - return scannerContext.setScannerState(NextState.BATCH_LIMIT_REACHED).hasMoreValues(); - } else if (scannerContext.checkSizeLimit(limitScope)) { - ScannerContext.NextState state = + if (scannerContext.checkBatchLimit(limitScope)) { + return scannerContext.setScannerState(NextState.BATCH_LIMIT_REACHED).hasMoreValues(); + } else if (scannerContext.checkSizeLimit(limitScope)) { + ScannerContext.NextState state = moreCellsInRow ? NextState.SIZE_LIMIT_REACHED_MID_ROW : NextState.SIZE_LIMIT_REACHED; - return scannerContext.setScannerState(state).hasMoreValues(); - } else if (scannerContext.checkTimeLimit(limitScope)) { - ScannerContext.NextState state = + return scannerContext.setScannerState(state).hasMoreValues(); + } else if (scannerContext.checkTimeLimit(limitScope)) { + ScannerContext.NextState state = moreCellsInRow ? NextState.TIME_LIMIT_REACHED_MID_ROW : NextState.TIME_LIMIT_REACHED; - return scannerContext.setScannerState(state).hasMoreValues(); - } - } while (moreCellsInRow); - + return scannerContext.setScannerState(state).hasMoreValues(); + } + } while (moreCellsInRow); + } catch (FileNotFoundException e) { + throw handleFileNotFound(e); + } return nextKv != null; } @@ -5748,18 +5760,42 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi } boolean result = false; startRegionOperation(); + KeyValue kv = KeyValueUtil.createFirstOnRow(row); try { - KeyValue kv = KeyValueUtil.createFirstOnRow(row); // use request seek to make use of the lazy seek option. See HBASE-5520 result = this.storeHeap.requestSeek(kv, true, true); if (this.joinedHeap != null) { result = this.joinedHeap.requestSeek(kv, true, true) || result; } + } catch (FileNotFoundException e) { + throw handleFileNotFound(e); } finally { closeRegionOperation(); } return result; } + + private IOException handleFileNotFound(FileNotFoundException fnfe) throws IOException { + // tries to refresh the store files, otherwise shutdown the RS. + // TODO: add support for abort() of a single region and trigger reassignment. + try { + region.refreshStoreFiles(true); + return new IOException("unable to read store file"); + } catch (IOException e) { + String msg = "a store file got lost: " + fnfe.getMessage(); + LOG.error("unable to refresh store files", e); + abortRegionServer(msg); + return new NotServingRegionException( + getRegionInfo().getRegionNameAsString() + " is closing"); + } + } + + private void abortRegionServer(String msg) throws IOException { + if (rsServices instanceof HRegionServer) { + ((HRegionServer)rsServices).abort(msg); + } + throw new UnsupportedOperationException("not able to abort RS after: " + msg); + } } // Utility methods diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StoreFileScanner.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StoreFileScanner.java index c0a5d5e8692..42a378da10d 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StoreFileScanner.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StoreFileScanner.java @@ -19,6 +19,7 @@ package org.apache.hadoop.hbase.regionserver; +import java.io.FileNotFoundException; import java.io.IOException; import java.util.ArrayList; import java.util.Collection; @@ -143,6 +144,8 @@ public class StoreFileScanner implements KeyValueScanner { skipKVsNewerThanReadpoint(); } } + } catch (FileNotFoundException e) { + throw e; } catch(IOException e) { throw new IOException("Could not iterate " + this, e); } @@ -169,6 +172,8 @@ public class StoreFileScanner implements KeyValueScanner { } finally { realSeekDone = true; } + } catch (FileNotFoundException e) { + throw e; } catch (IOException ioe) { throw new IOException("Could not seek " + this + " to key " + key, ioe); } @@ -193,6 +198,8 @@ public class StoreFileScanner implements KeyValueScanner { } finally { realSeekDone = true; } + } catch (FileNotFoundException e) { + throw e; } catch (IOException ioe) { throw new IOException("Could not reseek " + this + " to key " + key, ioe); @@ -451,6 +458,8 @@ public class StoreFileScanner implements KeyValueScanner { } finally { realSeekDone = true; } + } catch (FileNotFoundException e) { + throw e; } catch (IOException ioe) { throw new IOException("Could not seekToPreviousRow " + this + " to key " + key, ioe); diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestCorruptedRegionStoreFile.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestCorruptedRegionStoreFile.java new file mode 100644 index 00000000000..8c9312c0712 --- /dev/null +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestCorruptedRegionStoreFile.java @@ -0,0 +1,250 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.hbase.regionserver; + +import java.io.IOException; +import java.util.ArrayList; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.hbase.HBaseTestingUtility; +import org.apache.hadoop.hbase.TableName; +import org.apache.hadoop.hbase.client.Scan; +import org.apache.hadoop.hbase.client.Result; +import org.apache.hadoop.hbase.client.ResultScanner; +import org.apache.hadoop.hbase.client.Durability; +import org.apache.hadoop.hbase.client.Put; +import org.apache.hadoop.hbase.client.Table; +import org.apache.hadoop.hbase.io.HFileLink; +import org.apache.hadoop.hbase.testclassification.MasterTests; +import org.apache.hadoop.hbase.testclassification.LargeTests; +import org.apache.hadoop.hbase.util.JVMClusterUtil.RegionServerThread; +import org.apache.hadoop.hbase.util.Bytes; +import org.apache.hadoop.hbase.util.FSUtils; +import org.apache.hadoop.hbase.util.FSVisitor; +import org.apache.hadoop.hbase.util.TestTableName; + +import org.junit.After; +import org.junit.Before; +import org.junit.Rule; +import org.junit.Test; +import org.junit.experimental.categories.Category; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertFalse; +import static org.junit.Assert.assertTrue; +import static org.junit.Assert.fail; + +@Category({MasterTests.class, LargeTests.class}) +public class TestCorruptedRegionStoreFile { + private static final Log LOG = LogFactory.getLog(TestCorruptedRegionStoreFile.class); + + private static final HBaseTestingUtility UTIL = new HBaseTestingUtility(); + + private static final String FAMILY_NAME_STR = "f"; + private static final byte[] FAMILY_NAME = Bytes.toBytes(FAMILY_NAME_STR); + + private static final int NUM_FILES = 25; + private static final int ROW_PER_FILE = 2000; + private static final int NUM_ROWS = NUM_FILES * ROW_PER_FILE; + + @Rule public TestTableName TEST_TABLE = new TestTableName(); + + private final ArrayList storeFiles = new ArrayList(); + private Path tableDir; + private int rowCount; + + private static void setupConf(Configuration conf) { + conf.setLong("hbase.hstore.compaction.min", 20); + conf.setLong("hbase.hstore.compaction.max", 39); + conf.setLong("hbase.hstore.blockingStoreFiles", 40); + } + + private void setupTable(final TableName tableName) throws IOException { + // load the table + Table table = UTIL.createTable(tableName, FAMILY_NAME); + try { + rowCount = 0; + byte[] value = new byte[1024]; + byte[] q = Bytes.toBytes("q"); + while (rowCount < NUM_ROWS) { + Put put = new Put(Bytes.toBytes(String.format("%010d", rowCount))); + put.setDurability(Durability.SKIP_WAL); + put.add(FAMILY_NAME, q, value); + table.put(put); + + if ((rowCount++ % ROW_PER_FILE) == 0) { + // flush it + UTIL.getHBaseAdmin().flush(tableName); + } + } + } finally { + UTIL.getHBaseAdmin().flush(tableName); + table.close(); + } + + assertEquals(NUM_ROWS, rowCount); + + // get the store file paths + storeFiles.clear(); + tableDir = FSUtils.getTableDir(getRootDir(), tableName); + FSVisitor.visitTableStoreFiles(getFileSystem(), tableDir, new FSVisitor.StoreFileVisitor() { + @Override + public void storeFile(final String region, final String family, final String hfile) + throws IOException { + HFileLink link = HFileLink.build(UTIL.getConfiguration(), tableName, region, family, hfile); + storeFiles.add(link.getOriginPath()); + } + }); + assertTrue("expected at least 1 store file", storeFiles.size() > 0); + LOG.info("store-files: " + storeFiles); + } + + @Before + public void setup() throws Exception { + setupConf(UTIL.getConfiguration()); + UTIL.startMiniCluster(2, 3); + + setupTable(TEST_TABLE.getTableName()); + } + + @After + public void tearDown() throws Exception { + try { + UTIL.shutdownMiniCluster(); + } catch (Exception e) { + LOG.warn("failure shutting down cluster", e); + } + } + + @Test(timeout=180000) + public void testLosingFileDuringScan() throws Exception { + assertEquals(rowCount, fullScanAndCount(TEST_TABLE.getTableName())); + + final FileSystem fs = getFileSystem(); + final Path tmpStoreFilePath = new Path(UTIL.getDataTestDir(), "corruptedHFile"); + + // try to query with the missing file + int count = fullScanAndCount(TEST_TABLE.getTableName(), new ScanInjector() { + private boolean hasFile = true; + + @Override + public void beforeScanNext(Table table) throws Exception { + // move the path away (now the region is corrupted) + if (hasFile) { + fs.copyToLocalFile(true, storeFiles.get(0), tmpStoreFilePath); + LOG.info("Move file to local"); + evictHFileCache(storeFiles.get(0)); + hasFile = false; + } + } + }); + assertTrue("expected one file lost: rowCount=" + count + " lostRows=" + (NUM_ROWS - count), + count >= (NUM_ROWS - ROW_PER_FILE)); + } + + @Test(timeout=180000) + public void testLosingFileAfterScannerInit() throws Exception { + assertEquals(rowCount, fullScanAndCount(TEST_TABLE.getTableName())); + + final FileSystem fs = getFileSystem(); + final Path tmpStoreFilePath = new Path(UTIL.getDataTestDir(), "corruptedHFile"); + + // try to query with the missing file + int count = fullScanAndCount(TEST_TABLE.getTableName(), new ScanInjector() { + private boolean hasFile = true; + + @Override + public void beforeScan(Table table, Scan scan) throws Exception { + // move the path away (now the region is corrupted) + if (hasFile) { + fs.copyToLocalFile(true, storeFiles.get(0), tmpStoreFilePath); + LOG.info("Move file to local"); + evictHFileCache(storeFiles.get(0)); + hasFile = false; + } + } + }); + assertTrue("expected one file lost: rowCount=" + count + " lostRows=" + (NUM_ROWS - count), + count >= (NUM_ROWS - ROW_PER_FILE)); + } + + // ========================================================================== + // Helpers + // ========================================================================== + private FileSystem getFileSystem() { + return UTIL.getHBaseCluster().getMaster().getMasterFileSystem().getFileSystem(); + } + + private Path getRootDir() { + return UTIL.getHBaseCluster().getMaster().getMasterFileSystem().getRootDir(); + } + + private void evictHFileCache(final Path hfile) throws Exception { + for (RegionServerThread rst: UTIL.getMiniHBaseCluster().getRegionServerThreads()) { + HRegionServer rs = rst.getRegionServer(); + rs.getCacheConfig().getBlockCache().evictBlocksByHfileName(hfile.getName()); + } + Thread.sleep(6000); + } + + private int fullScanAndCount(final TableName tableName) throws Exception { + return fullScanAndCount(tableName, new ScanInjector()); + } + + private int fullScanAndCount(final TableName tableName, final ScanInjector injector) + throws Exception { + Table table = UTIL.getConnection().getTable(tableName); + int count = 0; + try { + Scan scan = new Scan(); + scan.setCaching(1); + scan.setCacheBlocks(false); + injector.beforeScan(table, scan); + ResultScanner scanner = table.getScanner(scan); + try { + while (true) { + injector.beforeScanNext(table); + Result result = scanner.next(); + injector.afterScanNext(table, result); + if (result == null) break; + if ((count++ % (ROW_PER_FILE / 2)) == 0) { + LOG.debug("scan next " + count); + } + } + } finally { + scanner.close(); + injector.afterScan(table); + } + } finally { + table.close(); + } + return count; + } + + private class ScanInjector { + protected void beforeScan(Table table, Scan scan) throws Exception {} + protected void beforeScanNext(Table table) throws Exception {} + protected void afterScanNext(Table table, Result result) throws Exception {} + protected void afterScan(Table table) throws Exception {} + } +}