diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/io/FSDataInputStreamWrapper.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/io/FSDataInputStreamWrapper.java index 6ad5cedf308..d7867d11e81 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/io/FSDataInputStreamWrapper.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/io/FSDataInputStreamWrapper.java @@ -22,6 +22,7 @@ import java.io.InputStream; import java.lang.reflect.InvocationTargetException; import java.lang.reflect.Method; +import org.apache.commons.io.IOUtils; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.apache.hadoop.fs.FSDataInputStream; @@ -215,18 +216,12 @@ public class FSDataInputStreamWrapper { /** Close stream(s) if necessary. */ public void close() throws IOException { - if (!doCloseStreams) return; - try { - if (stream != streamNoFsChecksum && streamNoFsChecksum != null) { - streamNoFsChecksum.close(); - streamNoFsChecksum = null; - } - } finally { - if (stream != null) { - stream.close(); - stream = null; - } + if (!doCloseStreams) { + return; } + // we do not care about the close exception as it is for reading, no data loss issue. + IOUtils.closeQuietly(streamNoFsChecksum); + IOUtils.closeQuietly(stream); } public HFileSystem getHfs() { diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HStore.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HStore.java index f10d05517f5..719f234daef 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HStore.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HStore.java @@ -891,7 +891,7 @@ public class HStore implements Store { storeEngine.getStoreFileManager().clearCompactedFiles(); // clear the compacted files if (compactedfiles != null && !compactedfiles.isEmpty()) { - removeCompactedfiles(compactedfiles); + removeCompactedfiles(compactedfiles, true); } if (!result.isEmpty()) { // initialize the thread pool for closing store files in parallel. @@ -2751,6 +2751,11 @@ public class HStore implements Store { @Override public synchronized void closeAndArchiveCompactedFiles() throws IOException { + closeAndArchiveCompactedFiles(false); + } + + @VisibleForTesting + public synchronized void closeAndArchiveCompactedFiles(boolean storeClosing) throws IOException { // ensure other threads do not attempt to archive the same files on close() archiveLock.lock(); try { @@ -2772,7 +2777,7 @@ public class HStore implements Store { lock.readLock().unlock(); } if (copyCompactedfiles != null && !copyCompactedfiles.isEmpty()) { - removeCompactedfiles(copyCompactedfiles); + removeCompactedfiles(copyCompactedfiles, storeClosing); } } finally { archiveLock.unlock(); @@ -2784,20 +2789,38 @@ public class HStore implements Store { * @param compactedfiles The compacted files in this store that are not active in reads * @throws IOException */ - private void removeCompactedfiles(Collection compactedfiles) + private void removeCompactedfiles(Collection compactedfiles, boolean storeClosing) throws IOException { final List filesToRemove = new ArrayList(compactedfiles.size()); for (final StoreFile file : compactedfiles) { synchronized (file) { try { StoreFile.Reader r = file.getReader(); + + //Compacted files in the list should always be marked compacted away. In the event + //they're contradicting in order to guarantee data consistency + //should we choose one and ignore the other? + if (storeClosing && r != null && !r.isCompactedAway()) { + String msg = + "Region closing but StoreFile is in compacted list but not compacted away: " + + file.getPath(); + throw new IllegalStateException(msg); + } + if (r == null) { if (LOG.isDebugEnabled()) { LOG.debug("The file " + file + " was closed but still not archived."); } filesToRemove.add(file); } - if (r != null && r.isCompactedAway() && !r.isReferencedInReads()) { + + //If store is closing we're ignoring any references to keep things consistent + //and remove compacted storefiles from the region directory + if (r != null && file.isCompactedAway() && (!r.isReferencedInReads() || storeClosing)) { + if (storeClosing && r.isReferencedInReads()) { + LOG.warn("Region closing but StoreFile still has references: file=" + + file.getPath() + ", refCount=" + r.getRefCount()); + } // Even if deleting fails we need not bother as any new scanners won't be // able to use the compacted file as the status is already compactedAway if (LOG.isTraceEnabled()) { @@ -2808,13 +2831,21 @@ public class HStore implements Store { filesToRemove.add(file); } else { LOG.info("Can't archive compacted file " + file.getPath() - + " because of either isCompactedAway = " + r.isCompactedAway() - + " or file has reference, isReferencedInReads = " + r.isReferencedInReads() - + ", skipping for now."); + + " because of either isCompactedAway=" + r.isCompactedAway() + + " or file has reference, isReferencedInReads=" + r.isReferencedInReads() + + ", refCount=" + r.getRefCount() + ", skipping for now."); } } catch (Exception e) { - LOG.error( - "Exception while trying to close the compacted store file " + file.getPath().getName()); + String msg = "Exception while trying to close the compacted store file " + + file.getPath(); + if (storeClosing) { + msg = "Store is closing. " + msg; + } + LOG.error(msg, e); + //if we get an exception let caller know so it can abort the server + if (storeClosing) { + throw new IOException(msg, e); + } } } } diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StoreFile.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StoreFile.java index bf20c041c5a..4c5cd7ca1e0 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StoreFile.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StoreFile.java @@ -1223,6 +1223,14 @@ public class StoreFile { reader.hasMVCCInfo(), readPt, scannerOrder, canOptimizeForNonNullColumn); } + /** + * Return the ref count associated with the reader whenever a scanner associated with the + * reader is opened. + */ + int getRefCount() { + return refCount.get(); + } + /** * Increment the ref count associated with the reader when ever a scanner associated with the * reader is opened diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestCleanupCompactedFileOnRegionClose.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestCleanupCompactedFileOnRegionClose.java new file mode 100644 index 00000000000..dfc92093047 --- /dev/null +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestCleanupCompactedFileOnRegionClose.java @@ -0,0 +1,192 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.regionserver; + +import static junit.framework.TestCase.assertNull; +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertFalse; +import static org.junit.Assert.assertNotNull; +import static org.junit.Assert.assertTrue; +import static org.junit.Assert.fail; + +import java.io.IOException; +import java.util.Collection; + +import org.apache.hadoop.hbase.HBaseTestingUtility; +import org.apache.hadoop.hbase.KeyValue; +import org.apache.hadoop.hbase.TableName; +import org.apache.hadoop.hbase.client.Delete; +import org.apache.hadoop.hbase.client.Get; +import org.apache.hadoop.hbase.client.HBaseAdmin; +import org.apache.hadoop.hbase.client.IsolationLevel; +import org.apache.hadoop.hbase.client.Put; +import org.apache.hadoop.hbase.client.Result; +import org.apache.hadoop.hbase.client.ResultScanner; +import org.apache.hadoop.hbase.client.Scan; +import org.apache.hadoop.hbase.client.Table; +import org.apache.hadoop.hbase.master.cleaner.TimeToLiveHFileCleaner; +import org.apache.hadoop.hbase.regionserver.compactions.CompactionConfiguration; +import org.apache.hadoop.hbase.testclassification.MediumTests; +import org.apache.hadoop.hbase.util.Bytes; + +import org.junit.AfterClass; +import org.junit.BeforeClass; +import org.junit.Test; +import org.junit.experimental.categories.Category; + +@Category({MediumTests.class}) +public class TestCleanupCompactedFileOnRegionClose { + + private static HBaseTestingUtility util; + + @BeforeClass + public static void beforeClass() throws Exception { + util = new HBaseTestingUtility(); + util.getConfiguration().setInt(CompactionConfiguration.HBASE_HSTORE_COMPACTION_MIN_KEY,100); + util.getConfiguration().set("dfs.blocksize", "64000"); + util.getConfiguration().set("dfs.namenode.fs-limits.min-block-size", "1024"); + util.getConfiguration().set(TimeToLiveHFileCleaner.TTL_CONF_KEY,"0"); + util.startMiniCluster(2); + } + + @AfterClass + public static void afterclass() throws Exception { + util.shutdownMiniCluster(); + } + + @Test + public void testCleanupOnClose() throws Exception { + TableName tableName = TableName.valueOf("testCleanupOnClose"); + String familyName = "f"; + byte[] familyNameBytes = Bytes.toBytes(familyName); + util.createTable(tableName, familyName); + + HBaseAdmin hBaseAdmin = util.getHBaseAdmin(); + Table table = util.getConnection().getTable(tableName); + + HRegionServer rs = util.getRSForFirstRegionInTable(tableName); + Region region = rs.getOnlineRegions(tableName).get(0); + + int refSFCount = 4; + for (int i = 0; i < refSFCount; i++) { + for (int j = 0; j < refSFCount; j++) { + Put put = new Put(Bytes.toBytes(j)); + put.addColumn(familyNameBytes, Bytes.toBytes(i), Bytes.toBytes(j)); + table.put(put); + } + util.flush(tableName); + } + assertEquals(refSFCount, region.getStoreFileList(new byte[][]{familyNameBytes}).size()); + + //add a delete, to test wether we end up with an inconsistency post region close + Delete delete = new Delete(Bytes.toBytes(refSFCount-1)); + table.delete(delete); + util.flush(tableName); + assertFalse(table.exists(new Get(Bytes.toBytes(refSFCount-1)))); + + //Create a scanner and keep it open to add references to StoreFileReaders + Scan scan = new Scan(); + scan.setStopRow(Bytes.toBytes(refSFCount-2)); + scan.setCaching(1); + ResultScanner scanner = table.getScanner(scan); + Result res = scanner.next(); + assertNotNull(res); + assertEquals(refSFCount, res.getFamilyMap(familyNameBytes).size()); + + + //Verify the references + int count = 0; + for (StoreFile sf : (Collection)region.getStore(familyNameBytes).getStorefiles()) { + synchronized (sf) { + if (count < refSFCount) { + assertTrue(sf.getReader().isReferencedInReads()); + } else { + assertFalse(sf.getReader().isReferencedInReads()); + } + } + count++; + } + + //Major compact to produce compacted storefiles that need to be cleaned up + util.compact(tableName, true); + assertEquals(1, region.getStoreFileList(new byte[][]{familyNameBytes}).size()); + assertEquals(refSFCount+1, + ((HStore)region.getStore(familyNameBytes)).getStoreEngine().getStoreFileManager() + .getCompactedfiles().size()); + + //close then open the region to determine wether compacted storefiles get cleaned up on close + hBaseAdmin.unassign(region.getRegionInfo().getRegionName(), false); + hBaseAdmin.assign(region.getRegionInfo().getRegionName()); + util.waitUntilNoRegionsInTransition(10000); + + + assertFalse("Deleted row should not exist", + table.exists(new Get(Bytes.toBytes(refSFCount-1)))); + + rs = util.getRSForFirstRegionInTable(tableName); + region = rs.getOnlineRegions(tableName).get(0); + assertEquals(1, region.getStoreFileList(new byte[][]{familyNameBytes}).size()); + assertNull(((HStore)region.getStore(familyNameBytes)).getStoreEngine().getStoreFileManager() + .getCompactedfiles()); + } + + @Test + public void testIOExceptionThrownOnClose() throws Exception { + byte[] filler = new byte[128000]; + TableName tableName = TableName.valueOf("testIOExceptionThrownOnClose"); + String familyName = "f"; + byte[] familyNameBytes = Bytes.toBytes(familyName); + util.createTable(tableName, familyName); + + Table table = util.getConnection().getTable(tableName); + + HRegionServer rs = util.getRSForFirstRegionInTable(tableName); + Region region = rs.getOnlineRegions(tableName).get(0); + + int refSFCount = 4; + for (int i = 0; i < refSFCount; i++) { + for (int j = 0; j < refSFCount; j++) { + Put put = new Put(Bytes.toBytes(j)); + put.addColumn(familyNameBytes, Bytes.toBytes(i), filler); + table.put(put); + } + util.flush(tableName); + } + assertEquals(refSFCount, region.getStoreFileList(new byte[][]{familyNameBytes}).size()); + + HStore store = (HStore)region.getStore(familyNameBytes); + StoreFile hsf = region.getStore(familyNameBytes).getStorefiles().iterator().next(); + long readPt = region.getReadpoint(IsolationLevel.READ_COMMITTED); + StoreFileScanner preadScanner = hsf.getReader().getStoreFileScanner( + false, true, false, readPt, 0, false); + preadScanner.seek(KeyValue.LOWESTKEY); + + //Major compact to produce compacted storefiles that need to be cleaned up + util.compact(tableName, true); + assertNotNull(preadScanner.next()); + store.closeAndArchiveCompactedFiles(true); + + try { + assertNotNull(preadScanner.next()); + fail("Expected IOException"); + }catch (IOException ex) { + ex.printStackTrace(); + } + } +}