diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HStore.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HStore.java index 80f1f21c836..39d0cbb9610 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HStore.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HStore.java @@ -924,7 +924,7 @@ public class HStore implements Store, HeapSize, StoreConfigInformation, Propagat storeEngine.getStoreFileManager().clearCompactedFiles(); // clear the compacted files if (CollectionUtils.isNotEmpty(compactedfiles)) { - removeCompactedfiles(compactedfiles); + removeCompactedfiles(compactedfiles, true); } if (!result.isEmpty()) { // initialize the thread pool for closing store files in parallel. @@ -2531,6 +2531,11 @@ public class HStore implements Store, HeapSize, StoreConfigInformation, Propagat * Closes and archives the compacted files under this store */ public synchronized void closeAndArchiveCompactedFiles() throws IOException { + closeAndArchiveCompactedFiles(false); + } + + @VisibleForTesting + public synchronized void closeAndArchiveCompactedFiles(boolean storeClosing) throws IOException { // ensure other threads do not attempt to archive the same files on close() archiveLock.lock(); try { @@ -2549,7 +2554,7 @@ public class HStore implements Store, HeapSize, StoreConfigInformation, Propagat lock.readLock().unlock(); } if (CollectionUtils.isNotEmpty(copyCompactedfiles)) { - removeCompactedfiles(copyCompactedfiles); + removeCompactedfiles(copyCompactedfiles, storeClosing); } } finally { archiveLock.unlock(); @@ -2561,7 +2566,7 @@ public class HStore implements Store, HeapSize, StoreConfigInformation, Propagat * @param compactedfiles The compacted files in this store that are not active in reads * @throws IOException */ - private void removeCompactedfiles(Collection compactedfiles) + private void removeCompactedfiles(Collection compactedfiles, boolean storeClosing) throws IOException { final List filesToRemove = new ArrayList<>(compactedfiles.size()); for (final HStoreFile file : compactedfiles) { @@ -2573,11 +2578,29 @@ public class HStore implements Store, HeapSize, StoreConfigInformation, Propagat filesToRemove.add(file); continue; } - if (file.isCompactedAway() && !file.isReferencedInReads()) { + + //Compacted files in the list should always be marked compacted away. In the event + //they're contradicting in order to guarantee data consistency + //should we choose one and ignore the other? + if (storeClosing && !file.isCompactedAway()) { + String msg = + "Region closing but StoreFile is in compacted list but not compacted away: " + + file.getPath().getName(); + throw new IllegalStateException(msg); + } + + //If store is closing we're ignoring any references to keep things consistent + //and remove compacted storefiles from the region directory + if (file.isCompactedAway() && (!file.isReferencedInReads() || storeClosing)) { + if (storeClosing && file.isReferencedInReads()) { + LOG.debug("Region closing but StoreFile still has references: {}", + file.getPath().getName()); + } // Even if deleting fails we need not bother as any new scanners won't be // able to use the compacted file as the status is already compactedAway LOG.trace("Closing and archiving the file {}", file); r.close(true); + file.closeStreamReaders(true); // Just close and return filesToRemove.add(file); } else { @@ -2587,8 +2610,16 @@ public class HStore implements Store, HeapSize, StoreConfigInformation, Propagat + ", skipping for now."); } } catch (Exception e) { - LOG.error("Exception while trying to close the compacted store file {}", - file.getPath(), e); + String msg = "Exception while trying to close the compacted store file " + + file.getPath().getName(); + if (storeClosing) { + msg = "Store is closing. " + msg; + } + LOG.error(msg, e); + //if we get an exception let caller know so it can abort the server + if (storeClosing) { + throw new IOException(msg, e); + } } } } diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HStoreFile.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HStoreFile.java index 1950a721051..4a0c66f5015 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HStoreFile.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HStoreFile.java @@ -23,6 +23,8 @@ import java.util.Collections; import java.util.Map; import java.util.Optional; import java.util.OptionalLong; +import java.util.Set; +import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicInteger; @@ -38,10 +40,14 @@ import org.apache.hadoop.hbase.io.hfile.CacheConfig; import org.apache.hadoop.hbase.io.hfile.HFile; import org.apache.hadoop.hbase.util.BloomFilterFactory; import org.apache.hadoop.hbase.util.Bytes; + import org.apache.yetus.audience.InterfaceAudience; + import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.apache.hbase.thirdparty.com.google.common.annotations.VisibleForTesting; +import org.apache.hbase.thirdparty.com.google.common.base.Preconditions; + /** * A Store data file. Stores usually have one or more of these files. They @@ -57,7 +63,7 @@ import org.apache.hbase.thirdparty.com.google.common.annotations.VisibleForTesti * writer and a reader is that we write once but read a lot more. */ @InterfaceAudience.Private -public class HStoreFile implements StoreFile { +public class HStoreFile implements StoreFile, StoreFileReader.Listener { private static final Logger LOG = LoggerFactory.getLogger(HStoreFile.class.getName()); @@ -116,6 +122,9 @@ public class HStoreFile implements StoreFile { // done. private final AtomicInteger refCount = new AtomicInteger(0); + // Set implementation must be of concurrent type + private final Set streamReaders; + private final boolean noReadahead; private final boolean primaryReplica; @@ -219,6 +228,7 @@ public class HStoreFile implements StoreFile { */ public HStoreFile(FileSystem fs, StoreFileInfo fileInfo, Configuration conf, CacheConfig cacheConf, BloomType cfBloomType, boolean primaryReplica) { + this.streamReaders = ConcurrentHashMap.newKeySet(); this.fs = fs; this.fileInfo = fileInfo; this.cacheConf = cacheConf; @@ -502,8 +512,13 @@ public class HStoreFile implements StoreFile { public StoreFileScanner getStreamScanner(boolean canUseDropBehind, boolean cacheBlocks, boolean isCompaction, long readPt, long scannerOrder, boolean canOptimizeForNonNullColumn) throws IOException { - return createStreamReader(canUseDropBehind).getStoreFileScanner(cacheBlocks, false, + StoreFileReader reader = createStreamReader(canUseDropBehind); + reader.setListener(this); + StoreFileScanner sfScanner = reader.getStoreFileScanner(cacheBlocks, false, isCompaction, readPt, scannerOrder, canOptimizeForNonNullColumn); + //Add reader once the scanner is created + streamReaders.add(reader); + return sfScanner; } /** @@ -523,6 +538,19 @@ public class HStoreFile implements StoreFile { this.reader.close(evictOnClose); this.reader = null; } + closeStreamReaders(evictOnClose); + } + + public void closeStreamReaders(boolean evictOnClose) throws IOException { + synchronized (this) { + for (StoreFileReader entry : streamReaders) { + //closing the reader will remove itself from streamReaders thanks to the Listener + entry.close(evictOnClose); + } + int size = streamReaders.size(); + Preconditions.checkState(size == 0, + "There are still streamReaders post close: " + size); + } } /** @@ -589,4 +617,9 @@ public class HStoreFile implements StoreFile { TimeRange tr = getReader().timeRange; return tr != null ? OptionalLong.of(tr.getMax()) : OptionalLong.empty(); } + + @Override + public void storeFileReaderClosed(StoreFileReader reader) { + streamReaders.remove(reader); + } } diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StoreFileReader.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StoreFileReader.java index aeff1f8b059..9080c2db84a 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StoreFileReader.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StoreFileReader.java @@ -85,6 +85,10 @@ public class StoreFileReader { @VisibleForTesting final boolean shared; + private volatile Listener listener; + + private boolean closed = false; + private StoreFileReader(HFile.Reader reader, AtomicInteger refCount, boolean shared) { this.reader = reader; bloomFilterType = BloomType.NONE; @@ -209,7 +213,16 @@ public class StoreFileReader { } public void close(boolean evictOnClose) throws IOException { - reader.close(evictOnClose); + synchronized (this) { + if (closed) { + return; + } + reader.close(evictOnClose); + closed = true; + } + if (listener != null) { + listener.storeFileReaderClosed(this); + } } /** @@ -644,4 +657,12 @@ public class StoreFileReader { void setSkipResetSeqId(boolean skipResetSeqId) { this.skipResetSeqId = skipResetSeqId; } + + public void setListener(Listener listener) { + this.listener = listener; + } + + public interface Listener { + void storeFileReaderClosed(StoreFileReader reader); + } } diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestCleanupCompactedFileOnRegionClose.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestCleanupCompactedFileOnRegionClose.java new file mode 100644 index 00000000000..564e5c7c388 --- /dev/null +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestCleanupCompactedFileOnRegionClose.java @@ -0,0 +1,210 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.regionserver; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertFalse; +import static org.junit.Assert.assertNotNull; +import static org.junit.Assert.assertTrue; +import static org.junit.Assert.fail; + +import java.io.IOException; +import java.util.Collection; + +import org.apache.hadoop.hbase.HBaseClassTestRule; +import org.apache.hadoop.hbase.HBaseTestingUtility; +import org.apache.hadoop.hbase.KeyValue; +import org.apache.hadoop.hbase.TableName; +import org.apache.hadoop.hbase.client.Delete; +import org.apache.hadoop.hbase.client.Get; +import org.apache.hadoop.hbase.client.HBaseAdmin; +import org.apache.hadoop.hbase.client.IsolationLevel; +import org.apache.hadoop.hbase.client.Put; +import org.apache.hadoop.hbase.client.Result; +import org.apache.hadoop.hbase.client.ResultScanner; +import org.apache.hadoop.hbase.client.Scan; +import org.apache.hadoop.hbase.client.Table; +import org.apache.hadoop.hbase.master.cleaner.TimeToLiveHFileCleaner; +import org.apache.hadoop.hbase.regionserver.compactions.CompactionConfiguration; +import org.apache.hadoop.hbase.testclassification.MediumTests; +import org.apache.hadoop.hbase.util.Bytes; + +import org.junit.AfterClass; +import org.junit.BeforeClass; +import org.junit.ClassRule; +import org.junit.Test; +import org.junit.experimental.categories.Category; + +@Category({MediumTests.class}) +public class TestCleanupCompactedFileOnRegionClose { + + @ClassRule + public static final HBaseClassTestRule CLASS_RULE = + HBaseClassTestRule.forClass(TestCleanupCompactedFileOnRegionClose.class); + + private static HBaseTestingUtility util; + + @BeforeClass + public static void beforeClass() throws Exception { + util = new HBaseTestingUtility(); + util.getConfiguration().setInt(CompactionConfiguration.HBASE_HSTORE_COMPACTION_MIN_KEY,100); + util.getConfiguration().set("dfs.blocksize", "64000"); + util.getConfiguration().set("dfs.namenode.fs-limits.min-block-size", "1024"); + util.getConfiguration().set(TimeToLiveHFileCleaner.TTL_CONF_KEY,"0"); + util.startMiniCluster(2); + } + + @AfterClass + public static void afterclass() throws Exception { + util.shutdownMiniCluster(); + } + + @Test + public void testCleanupOnClose() throws Exception { + TableName tableName = TableName.valueOf("testCleanupOnClose"); + String familyName = "f"; + byte[] familyNameBytes = Bytes.toBytes(familyName); + util.createTable(tableName, familyName); + + HBaseAdmin hBaseAdmin = util.getHBaseAdmin(); + Table table = util.getConnection().getTable(tableName); + + HRegionServer rs = util.getRSForFirstRegionInTable(tableName); + Region region = rs.getRegions(tableName).get(0); + + int refSFCount = 4; + for (int i = 0; i < refSFCount; i++) { + for (int j = 0; j < refSFCount; j++) { + Put put = new Put(Bytes.toBytes(j)); + put.addColumn(familyNameBytes, Bytes.toBytes(i), Bytes.toBytes(j)); + table.put(put); + } + util.flush(tableName); + } + assertEquals(refSFCount, region.getStoreFileList(new byte[][]{familyNameBytes}).size()); + + //add a delete, to test wether we end up with an inconsistency post region close + Delete delete = new Delete(Bytes.toBytes(refSFCount-1)); + table.delete(delete); + util.flush(tableName); + assertFalse(table.exists(new Get(Bytes.toBytes(refSFCount-1)))); + + //Create a scanner and keep it open to add references to StoreFileReaders + Scan scan = new Scan(); + scan.setStopRow(Bytes.toBytes(refSFCount-2)); + scan.setCaching(1); + ResultScanner scanner = table.getScanner(scan); + Result res = scanner.next(); + assertNotNull(res); + assertEquals(refSFCount, res.getFamilyMap(familyNameBytes).size()); + + + //Verify the references + int count = 0; + for (HStoreFile sf : (Collection)region.getStore(familyNameBytes).getStorefiles()) { + synchronized (sf) { + if (count < refSFCount) { + assertTrue(sf.isReferencedInReads()); + } else { + assertFalse(sf.isReferencedInReads()); + } + } + count++; + } + + //Major compact to produce compacted storefiles that need to be cleaned up + util.compact(tableName, true); + assertEquals(1, region.getStoreFileList(new byte[][]{familyNameBytes}).size()); + assertEquals(refSFCount+1, + ((HStore)region.getStore(familyNameBytes)).getStoreEngine().getStoreFileManager() + .getCompactedfiles().size()); + + //close then open the region to determine wether compacted storefiles get cleaned up on close + hBaseAdmin.unassign(region.getRegionInfo().getRegionName(), false); + hBaseAdmin.assign(region.getRegionInfo().getRegionName()); + util.waitUntilNoRegionsInTransition(10000); + + + assertFalse("Deleted row should not exist", + table.exists(new Get(Bytes.toBytes(refSFCount-1)))); + + rs = util.getRSForFirstRegionInTable(tableName); + region = rs.getRegions(tableName).get(0); + assertEquals(1, region.getStoreFileList(new byte[][]{familyNameBytes}).size()); + assertEquals(0, + ((HStore)region.getStore(familyNameBytes)).getStoreEngine().getStoreFileManager() + .getCompactedfiles().size()); + } + + @Test + public void testIOExceptionThrownOnClose() throws Exception { + byte[] filler = new byte[128000]; + TableName tableName = TableName.valueOf("testIOExceptionThrownOnClose"); + String familyName = "f"; + byte[] familyNameBytes = Bytes.toBytes(familyName); + util.createTable(tableName, familyName); + + Table table = util.getConnection().getTable(tableName); + + HRegionServer rs = util.getRSForFirstRegionInTable(tableName); + Region region = rs.getRegions(tableName).get(0); + + int refSFCount = 4; + for (int i = 0; i < refSFCount; i++) { + for (int j = 0; j < refSFCount; j++) { + Put put = new Put(Bytes.toBytes(j)); + put.addColumn(familyNameBytes, Bytes.toBytes(i), filler); + table.put(put); + } + util.flush(tableName); + } + assertEquals(refSFCount, region.getStoreFileList(new byte[][]{familyNameBytes}).size()); + + HStore store = ((HRegion) region).getStore(familyNameBytes); + HStoreFile hsf = ((Collection)region.getStore(familyNameBytes).getStorefiles()) + .iterator().next(); + long readPt = ((HRegion)region).getReadPoint(IsolationLevel.READ_COMMITTED); + StoreFileScanner preadScanner = hsf.getPreadScanner(false, readPt, 0, false); + StoreFileScanner streamScanner = + hsf.getStreamScanner(false, false, false, readPt, 0, false); + preadScanner.seek(KeyValue.LOWESTKEY); + streamScanner.seek(KeyValue.LOWESTKEY); + + //Major compact to produce compacted storefiles that need to be cleaned up + util.compact(tableName, true); + assertNotNull(preadScanner.next()); + assertNotNull(streamScanner.next()); + store.closeAndArchiveCompactedFiles(true); + + try { + assertNotNull(preadScanner.next()); + fail("Expected IOException"); + }catch (IOException ex) { + ex.printStackTrace(); + } + + //Wait a bit for file to be remove from + try { + assertNotNull(streamScanner.next()); + fail("Expected IOException"); + } catch (IOException ex) { + ex.printStackTrace(); + } + } +}