HBASE-20704 Sometimes some compacted storefiles are not archived on region close

This commit is contained in:
Francis Liu 2018-08-12 21:27:03 -07:00
parent e86c736028
commit cdfe808892
4 changed files with 304 additions and 9 deletions

View File

@ -927,7 +927,7 @@ public class HStore implements Store, HeapSize, StoreConfigInformation, Propagat
storeEngine.getStoreFileManager().clearCompactedFiles(); storeEngine.getStoreFileManager().clearCompactedFiles();
// clear the compacted files // clear the compacted files
if (CollectionUtils.isNotEmpty(compactedfiles)) { if (CollectionUtils.isNotEmpty(compactedfiles)) {
removeCompactedfiles(compactedfiles); removeCompactedfiles(compactedfiles, true);
} }
if (!result.isEmpty()) { if (!result.isEmpty()) {
// initialize the thread pool for closing store files in parallel. // initialize the thread pool for closing store files in parallel.
@ -2575,6 +2575,11 @@ public class HStore implements Store, HeapSize, StoreConfigInformation, Propagat
* Closes and archives the compacted files under this store * Closes and archives the compacted files under this store
*/ */
public synchronized void closeAndArchiveCompactedFiles() throws IOException { public synchronized void closeAndArchiveCompactedFiles() throws IOException {
closeAndArchiveCompactedFiles(false);
}
@VisibleForTesting
public synchronized void closeAndArchiveCompactedFiles(boolean storeClosing) throws IOException {
// ensure other threads do not attempt to archive the same files on close() // ensure other threads do not attempt to archive the same files on close()
archiveLock.lock(); archiveLock.lock();
try { try {
@ -2593,7 +2598,7 @@ public class HStore implements Store, HeapSize, StoreConfigInformation, Propagat
lock.readLock().unlock(); lock.readLock().unlock();
} }
if (CollectionUtils.isNotEmpty(copyCompactedfiles)) { if (CollectionUtils.isNotEmpty(copyCompactedfiles)) {
removeCompactedfiles(copyCompactedfiles); removeCompactedfiles(copyCompactedfiles, storeClosing);
} }
} finally { } finally {
archiveLock.unlock(); archiveLock.unlock();
@ -2604,7 +2609,7 @@ public class HStore implements Store, HeapSize, StoreConfigInformation, Propagat
* Archives and removes the compacted files * Archives and removes the compacted files
* @param compactedfiles The compacted files in this store that are not active in reads * @param compactedfiles The compacted files in this store that are not active in reads
*/ */
private void removeCompactedfiles(Collection<HStoreFile> compactedfiles) private void removeCompactedfiles(Collection<HStoreFile> compactedfiles, boolean storeClosing)
throws IOException { throws IOException {
final List<HStoreFile> filesToRemove = new ArrayList<>(compactedfiles.size()); final List<HStoreFile> filesToRemove = new ArrayList<>(compactedfiles.size());
final List<Long> storeFileSizes = new ArrayList<>(compactedfiles.size()); final List<Long> storeFileSizes = new ArrayList<>(compactedfiles.size());
@ -2622,13 +2627,31 @@ public class HStore implements Store, HeapSize, StoreConfigInformation, Propagat
storeFileSizes.add(length); storeFileSizes.add(length);
continue; continue;
} }
if (file.isCompactedAway() && !file.isReferencedInReads()) {
//Compacted files in the list should always be marked compacted away. In the event
//they're contradicting in order to guarantee data consistency
//should we choose one and ignore the other?
if (storeClosing && !file.isCompactedAway()) {
String msg =
"Region closing but StoreFile is in compacted list but not compacted away: " +
file.getPath().getName();
throw new IllegalStateException(msg);
}
//If store is closing we're ignoring any references to keep things consistent
//and remove compacted storefiles from the region directory
if (file.isCompactedAway() && (!file.isReferencedInReads() || storeClosing)) {
if (storeClosing && file.isReferencedInReads()) {
LOG.debug("Region closing but StoreFile still has references: {}",
file.getPath().getName());
}
// Even if deleting fails we need not bother as any new scanners won't be // Even if deleting fails we need not bother as any new scanners won't be
// able to use the compacted file as the status is already compactedAway // able to use the compacted file as the status is already compactedAway
LOG.trace("Closing and archiving the file {}", file); LOG.trace("Closing and archiving the file {}", file);
// Copy the file size before closing the reader // Copy the file size before closing the reader
final long length = r.length(); final long length = r.length();
r.close(true); r.close(true);
file.closeStreamReaders(true);
// Just close and return // Just close and return
filesToRemove.add(file); filesToRemove.add(file);
// Only add the length if we successfully added the file to `filesToRemove` // Only add the length if we successfully added the file to `filesToRemove`
@ -2640,8 +2663,16 @@ public class HStore implements Store, HeapSize, StoreConfigInformation, Propagat
+ ", skipping for now."); + ", skipping for now.");
} }
} catch (Exception e) { } catch (Exception e) {
LOG.error("Exception while trying to close the compacted store file {}", String msg = "Exception while trying to close the compacted store file " +
file.getPath(), e); file.getPath().getName();
if (storeClosing) {
msg = "Store is closing. " + msg;
}
LOG.error(msg, e);
//if we get an exception let caller know so it can abort the server
if (storeClosing) {
throw new IOException(msg, e);
}
} }
} }
} }

View File

@ -23,6 +23,8 @@ import java.util.Collections;
import java.util.Map; import java.util.Map;
import java.util.Optional; import java.util.Optional;
import java.util.OptionalLong; import java.util.OptionalLong;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.AtomicInteger;
@ -38,10 +40,14 @@ import org.apache.hadoop.hbase.io.hfile.CacheConfig;
import org.apache.hadoop.hbase.io.hfile.HFile; import org.apache.hadoop.hbase.io.hfile.HFile;
import org.apache.hadoop.hbase.util.BloomFilterFactory; import org.apache.hadoop.hbase.util.BloomFilterFactory;
import org.apache.hadoop.hbase.util.Bytes; import org.apache.hadoop.hbase.util.Bytes;
import org.apache.yetus.audience.InterfaceAudience; import org.apache.yetus.audience.InterfaceAudience;
import org.slf4j.Logger; import org.slf4j.Logger;
import org.slf4j.LoggerFactory; import org.slf4j.LoggerFactory;
import org.apache.hbase.thirdparty.com.google.common.annotations.VisibleForTesting; import org.apache.hbase.thirdparty.com.google.common.annotations.VisibleForTesting;
import org.apache.hbase.thirdparty.com.google.common.base.Preconditions;
/** /**
* A Store data file. Stores usually have one or more of these files. They * A Store data file. Stores usually have one or more of these files. They
@ -57,7 +63,7 @@ import org.apache.hbase.thirdparty.com.google.common.annotations.VisibleForTesti
* writer and a reader is that we write once but read a lot more. * writer and a reader is that we write once but read a lot more.
*/ */
@InterfaceAudience.Private @InterfaceAudience.Private
public class HStoreFile implements StoreFile { public class HStoreFile implements StoreFile, StoreFileReader.Listener {
private static final Logger LOG = LoggerFactory.getLogger(HStoreFile.class.getName()); private static final Logger LOG = LoggerFactory.getLogger(HStoreFile.class.getName());
@ -116,6 +122,9 @@ public class HStoreFile implements StoreFile {
// done. // done.
private final AtomicInteger refCount = new AtomicInteger(0); private final AtomicInteger refCount = new AtomicInteger(0);
// Set implementation must be of concurrent type
private final Set<StoreFileReader> streamReaders;
private final boolean noReadahead; private final boolean noReadahead;
private final boolean primaryReplica; private final boolean primaryReplica;
@ -219,6 +228,7 @@ public class HStoreFile implements StoreFile {
*/ */
public HStoreFile(FileSystem fs, StoreFileInfo fileInfo, Configuration conf, CacheConfig cacheConf, public HStoreFile(FileSystem fs, StoreFileInfo fileInfo, Configuration conf, CacheConfig cacheConf,
BloomType cfBloomType, boolean primaryReplica) { BloomType cfBloomType, boolean primaryReplica) {
this.streamReaders = ConcurrentHashMap.newKeySet();
this.fs = fs; this.fs = fs;
this.fileInfo = fileInfo; this.fileInfo = fileInfo;
this.cacheConf = cacheConf; this.cacheConf = cacheConf;
@ -502,8 +512,13 @@ public class HStoreFile implements StoreFile {
public StoreFileScanner getStreamScanner(boolean canUseDropBehind, boolean cacheBlocks, public StoreFileScanner getStreamScanner(boolean canUseDropBehind, boolean cacheBlocks,
boolean isCompaction, long readPt, long scannerOrder, boolean canOptimizeForNonNullColumn) boolean isCompaction, long readPt, long scannerOrder, boolean canOptimizeForNonNullColumn)
throws IOException { throws IOException {
return createStreamReader(canUseDropBehind).getStoreFileScanner(cacheBlocks, false, StoreFileReader reader = createStreamReader(canUseDropBehind);
reader.setListener(this);
StoreFileScanner sfScanner = reader.getStoreFileScanner(cacheBlocks, false,
isCompaction, readPt, scannerOrder, canOptimizeForNonNullColumn); isCompaction, readPt, scannerOrder, canOptimizeForNonNullColumn);
//Add reader once the scanner is created
streamReaders.add(reader);
return sfScanner;
} }
/** /**
@ -523,6 +538,19 @@ public class HStoreFile implements StoreFile {
this.reader.close(evictOnClose); this.reader.close(evictOnClose);
this.reader = null; this.reader = null;
} }
closeStreamReaders(evictOnClose);
}
public void closeStreamReaders(boolean evictOnClose) throws IOException {
synchronized (this) {
for (StoreFileReader entry : streamReaders) {
//closing the reader will remove itself from streamReaders thanks to the Listener
entry.close(evictOnClose);
}
int size = streamReaders.size();
Preconditions.checkState(size == 0,
"There are still streamReaders post close: " + size);
}
} }
/** /**
@ -589,4 +617,9 @@ public class HStoreFile implements StoreFile {
TimeRange tr = getReader().timeRange; TimeRange tr = getReader().timeRange;
return tr != null ? OptionalLong.of(tr.getMax()) : OptionalLong.empty(); return tr != null ? OptionalLong.of(tr.getMax()) : OptionalLong.empty();
} }
@Override
public void storeFileReaderClosed(StoreFileReader reader) {
streamReaders.remove(reader);
}
} }

View File

@ -85,6 +85,10 @@ public class StoreFileReader {
@VisibleForTesting @VisibleForTesting
final boolean shared; final boolean shared;
private volatile Listener listener;
private boolean closed = false;
private StoreFileReader(HFile.Reader reader, AtomicInteger refCount, boolean shared) { private StoreFileReader(HFile.Reader reader, AtomicInteger refCount, boolean shared) {
this.reader = reader; this.reader = reader;
bloomFilterType = BloomType.NONE; bloomFilterType = BloomType.NONE;
@ -209,7 +213,16 @@ public class StoreFileReader {
} }
public void close(boolean evictOnClose) throws IOException { public void close(boolean evictOnClose) throws IOException {
reader.close(evictOnClose); synchronized (this) {
if (closed) {
return;
}
reader.close(evictOnClose);
closed = true;
}
if (listener != null) {
listener.storeFileReaderClosed(this);
}
} }
/** /**
@ -644,4 +657,12 @@ public class StoreFileReader {
void setSkipResetSeqId(boolean skipResetSeqId) { void setSkipResetSeqId(boolean skipResetSeqId) {
this.skipResetSeqId = skipResetSeqId; this.skipResetSeqId = skipResetSeqId;
} }
public void setListener(Listener listener) {
this.listener = listener;
}
public interface Listener {
void storeFileReaderClosed(StoreFileReader reader);
}
} }

View File

@ -0,0 +1,210 @@
/*
*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hbase.regionserver;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertNotNull;
import static org.junit.Assert.assertTrue;
import static org.junit.Assert.fail;
import java.io.IOException;
import java.util.Collection;
import org.apache.hadoop.hbase.HBaseClassTestRule;
import org.apache.hadoop.hbase.HBaseTestingUtility;
import org.apache.hadoop.hbase.KeyValue;
import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.client.Delete;
import org.apache.hadoop.hbase.client.Get;
import org.apache.hadoop.hbase.client.HBaseAdmin;
import org.apache.hadoop.hbase.client.IsolationLevel;
import org.apache.hadoop.hbase.client.Put;
import org.apache.hadoop.hbase.client.Result;
import org.apache.hadoop.hbase.client.ResultScanner;
import org.apache.hadoop.hbase.client.Scan;
import org.apache.hadoop.hbase.client.Table;
import org.apache.hadoop.hbase.master.cleaner.TimeToLiveHFileCleaner;
import org.apache.hadoop.hbase.regionserver.compactions.CompactionConfiguration;
import org.apache.hadoop.hbase.testclassification.MediumTests;
import org.apache.hadoop.hbase.util.Bytes;
import org.junit.AfterClass;
import org.junit.BeforeClass;
import org.junit.ClassRule;
import org.junit.Test;
import org.junit.experimental.categories.Category;
@Category({MediumTests.class})
public class TestCleanupCompactedFileOnRegionClose {
@ClassRule
public static final HBaseClassTestRule CLASS_RULE =
HBaseClassTestRule.forClass(TestCleanupCompactedFileOnRegionClose.class);
private static HBaseTestingUtility util;
@BeforeClass
public static void beforeClass() throws Exception {
util = new HBaseTestingUtility();
util.getConfiguration().setInt(CompactionConfiguration.HBASE_HSTORE_COMPACTION_MIN_KEY,100);
util.getConfiguration().set("dfs.blocksize", "64000");
util.getConfiguration().set("dfs.namenode.fs-limits.min-block-size", "1024");
util.getConfiguration().set(TimeToLiveHFileCleaner.TTL_CONF_KEY,"0");
util.startMiniCluster(2);
}
@AfterClass
public static void afterclass() throws Exception {
util.shutdownMiniCluster();
}
@Test
public void testCleanupOnClose() throws Exception {
TableName tableName = TableName.valueOf("testCleanupOnClose");
String familyName = "f";
byte[] familyNameBytes = Bytes.toBytes(familyName);
util.createTable(tableName, familyName);
HBaseAdmin hBaseAdmin = util.getHBaseAdmin();
Table table = util.getConnection().getTable(tableName);
HRegionServer rs = util.getRSForFirstRegionInTable(tableName);
Region region = rs.getRegions(tableName).get(0);
int refSFCount = 4;
for (int i = 0; i < refSFCount; i++) {
for (int j = 0; j < refSFCount; j++) {
Put put = new Put(Bytes.toBytes(j));
put.addColumn(familyNameBytes, Bytes.toBytes(i), Bytes.toBytes(j));
table.put(put);
}
util.flush(tableName);
}
assertEquals(refSFCount, region.getStoreFileList(new byte[][]{familyNameBytes}).size());
//add a delete, to test wether we end up with an inconsistency post region close
Delete delete = new Delete(Bytes.toBytes(refSFCount-1));
table.delete(delete);
util.flush(tableName);
assertFalse(table.exists(new Get(Bytes.toBytes(refSFCount-1))));
//Create a scanner and keep it open to add references to StoreFileReaders
Scan scan = new Scan();
scan.setStopRow(Bytes.toBytes(refSFCount-2));
scan.setCaching(1);
ResultScanner scanner = table.getScanner(scan);
Result res = scanner.next();
assertNotNull(res);
assertEquals(refSFCount, res.getFamilyMap(familyNameBytes).size());
//Verify the references
int count = 0;
for (HStoreFile sf : (Collection<HStoreFile>)region.getStore(familyNameBytes).getStorefiles()) {
synchronized (sf) {
if (count < refSFCount) {
assertTrue(sf.isReferencedInReads());
} else {
assertFalse(sf.isReferencedInReads());
}
}
count++;
}
//Major compact to produce compacted storefiles that need to be cleaned up
util.compact(tableName, true);
assertEquals(1, region.getStoreFileList(new byte[][]{familyNameBytes}).size());
assertEquals(refSFCount+1,
((HStore)region.getStore(familyNameBytes)).getStoreEngine().getStoreFileManager()
.getCompactedfiles().size());
//close then open the region to determine wether compacted storefiles get cleaned up on close
hBaseAdmin.unassign(region.getRegionInfo().getRegionName(), false);
hBaseAdmin.assign(region.getRegionInfo().getRegionName());
util.waitUntilNoRegionsInTransition(10000);
assertFalse("Deleted row should not exist",
table.exists(new Get(Bytes.toBytes(refSFCount-1))));
rs = util.getRSForFirstRegionInTable(tableName);
region = rs.getRegions(tableName).get(0);
assertEquals(1, region.getStoreFileList(new byte[][]{familyNameBytes}).size());
assertEquals(0,
((HStore)region.getStore(familyNameBytes)).getStoreEngine().getStoreFileManager()
.getCompactedfiles().size());
}
@Test
public void testIOExceptionThrownOnClose() throws Exception {
byte[] filler = new byte[128000];
TableName tableName = TableName.valueOf("testIOExceptionThrownOnClose");
String familyName = "f";
byte[] familyNameBytes = Bytes.toBytes(familyName);
util.createTable(tableName, familyName);
Table table = util.getConnection().getTable(tableName);
HRegionServer rs = util.getRSForFirstRegionInTable(tableName);
Region region = rs.getRegions(tableName).get(0);
int refSFCount = 4;
for (int i = 0; i < refSFCount; i++) {
for (int j = 0; j < refSFCount; j++) {
Put put = new Put(Bytes.toBytes(j));
put.addColumn(familyNameBytes, Bytes.toBytes(i), filler);
table.put(put);
}
util.flush(tableName);
}
assertEquals(refSFCount, region.getStoreFileList(new byte[][]{familyNameBytes}).size());
HStore store = ((HRegion) region).getStore(familyNameBytes);
HStoreFile hsf = ((Collection<HStoreFile>)region.getStore(familyNameBytes).getStorefiles())
.iterator().next();
long readPt = ((HRegion)region).getReadPoint(IsolationLevel.READ_COMMITTED);
StoreFileScanner preadScanner = hsf.getPreadScanner(false, readPt, 0, false);
StoreFileScanner streamScanner =
hsf.getStreamScanner(false, false, false, readPt, 0, false);
preadScanner.seek(KeyValue.LOWESTKEY);
streamScanner.seek(KeyValue.LOWESTKEY);
//Major compact to produce compacted storefiles that need to be cleaned up
util.compact(tableName, true);
assertNotNull(preadScanner.next());
assertNotNull(streamScanner.next());
store.closeAndArchiveCompactedFiles(true);
try {
assertNotNull(preadScanner.next());
fail("Expected IOException");
}catch (IOException ex) {
ex.printStackTrace();
}
//Wait a bit for file to be remove from
try {
assertNotNull(streamScanner.next());
fail("Expected IOException");
} catch (IOException ex) {
ex.printStackTrace();
}
}
}