HBASE-20704 Sometimes some compacted storefiles are not archived on region close, branch-1 backport

Signed-off-by: Andrew Purtell <apurtell@apache.org>
Amending-Author: Andrew Purtell <apurtell@apache.org>
This commit is contained in:
Francis Liu 2018-09-18 19:17:20 -07:00 committed by Andrew Purtell
parent bb529b706f
commit 5f8de7314c
No known key found for this signature in database
GPG Key ID: 8597754DD5365CCD
4 changed files with 246 additions and 20 deletions

View File

@ -22,6 +22,7 @@ import java.io.InputStream;
import java.lang.reflect.InvocationTargetException;
import java.lang.reflect.Method;
import org.apache.commons.io.IOUtils;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.fs.FSDataInputStream;
@ -215,18 +216,12 @@ public class FSDataInputStreamWrapper {
/** Close stream(s) if necessary. */
public void close() throws IOException {
if (!doCloseStreams) return;
try {
if (stream != streamNoFsChecksum && streamNoFsChecksum != null) {
streamNoFsChecksum.close();
streamNoFsChecksum = null;
}
} finally {
if (stream != null) {
stream.close();
stream = null;
}
if (!doCloseStreams) {
return;
}
// we do not care about the close exception as it is for reading, no data loss issue.
IOUtils.closeQuietly(streamNoFsChecksum);
IOUtils.closeQuietly(stream);
}
public HFileSystem getHfs() {

View File

@ -891,7 +891,7 @@ public class HStore implements Store {
storeEngine.getStoreFileManager().clearCompactedFiles();
// clear the compacted files
if (compactedfiles != null && !compactedfiles.isEmpty()) {
removeCompactedfiles(compactedfiles);
removeCompactedfiles(compactedfiles, true);
}
if (!result.isEmpty()) {
// initialize the thread pool for closing store files in parallel.
@ -2751,6 +2751,11 @@ public class HStore implements Store {
@Override
public synchronized void closeAndArchiveCompactedFiles() throws IOException {
closeAndArchiveCompactedFiles(false);
}
@VisibleForTesting
public synchronized void closeAndArchiveCompactedFiles(boolean storeClosing) throws IOException {
// ensure other threads do not attempt to archive the same files on close()
archiveLock.lock();
try {
@ -2772,7 +2777,7 @@ public class HStore implements Store {
lock.readLock().unlock();
}
if (copyCompactedfiles != null && !copyCompactedfiles.isEmpty()) {
removeCompactedfiles(copyCompactedfiles);
removeCompactedfiles(copyCompactedfiles, storeClosing);
}
} finally {
archiveLock.unlock();
@ -2784,20 +2789,38 @@ public class HStore implements Store {
* @param compactedfiles The compacted files in this store that are not active in reads
* @throws IOException
*/
private void removeCompactedfiles(Collection<StoreFile> compactedfiles)
private void removeCompactedfiles(Collection<StoreFile> compactedfiles, boolean storeClosing)
throws IOException {
final List<StoreFile> filesToRemove = new ArrayList<StoreFile>(compactedfiles.size());
for (final StoreFile file : compactedfiles) {
synchronized (file) {
try {
StoreFile.Reader r = file.getReader();
//Compacted files in the list should always be marked compacted away. In the event
//they're contradicting in order to guarantee data consistency
//should we choose one and ignore the other?
if (storeClosing && r != null && !r.isCompactedAway()) {
String msg =
"Region closing but StoreFile is in compacted list but not compacted away: " +
file.getPath();
throw new IllegalStateException(msg);
}
if (r == null) {
if (LOG.isDebugEnabled()) {
LOG.debug("The file " + file + " was closed but still not archived.");
}
filesToRemove.add(file);
}
if (r != null && r.isCompactedAway() && !r.isReferencedInReads()) {
//If store is closing we're ignoring any references to keep things consistent
//and remove compacted storefiles from the region directory
if (r != null && file.isCompactedAway() && (!r.isReferencedInReads() || storeClosing)) {
if (storeClosing && r.isReferencedInReads()) {
LOG.warn("Region closing but StoreFile still has references: file=" +
file.getPath() + ", refCount=" + r.getRefCount());
}
// Even if deleting fails we need not bother as any new scanners won't be
// able to use the compacted file as the status is already compactedAway
if (LOG.isTraceEnabled()) {
@ -2808,13 +2831,21 @@ public class HStore implements Store {
filesToRemove.add(file);
} else {
LOG.info("Can't archive compacted file " + file.getPath()
+ " because of either isCompactedAway = " + r.isCompactedAway()
+ " or file has reference, isReferencedInReads = " + r.isReferencedInReads()
+ ", skipping for now.");
+ " because of either isCompactedAway=" + r.isCompactedAway()
+ " or file has reference, isReferencedInReads=" + r.isReferencedInReads()
+ ", refCount=" + r.getRefCount() + ", skipping for now.");
}
} catch (Exception e) {
LOG.error(
"Exception while trying to close the compacted store file " + file.getPath().getName());
String msg = "Exception while trying to close the compacted store file " +
file.getPath();
if (storeClosing) {
msg = "Store is closing. " + msg;
}
LOG.error(msg, e);
//if we get an exception let caller know so it can abort the server
if (storeClosing) {
throw new IOException(msg, e);
}
}
}
}

View File

@ -1223,6 +1223,14 @@ public class StoreFile {
reader.hasMVCCInfo(), readPt, scannerOrder, canOptimizeForNonNullColumn);
}
/**
* Return the ref count associated with the reader whenever a scanner associated with the
* reader is opened.
*/
int getRefCount() {
return refCount.get();
}
/**
* Increment the ref count associated with the reader when ever a scanner associated with the
* reader is opened

View File

@ -0,0 +1,192 @@
/*
*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hbase.regionserver;
import static junit.framework.TestCase.assertNull;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertNotNull;
import static org.junit.Assert.assertTrue;
import static org.junit.Assert.fail;
import java.io.IOException;
import java.util.Collection;
import org.apache.hadoop.hbase.HBaseTestingUtility;
import org.apache.hadoop.hbase.KeyValue;
import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.client.Delete;
import org.apache.hadoop.hbase.client.Get;
import org.apache.hadoop.hbase.client.HBaseAdmin;
import org.apache.hadoop.hbase.client.IsolationLevel;
import org.apache.hadoop.hbase.client.Put;
import org.apache.hadoop.hbase.client.Result;
import org.apache.hadoop.hbase.client.ResultScanner;
import org.apache.hadoop.hbase.client.Scan;
import org.apache.hadoop.hbase.client.Table;
import org.apache.hadoop.hbase.master.cleaner.TimeToLiveHFileCleaner;
import org.apache.hadoop.hbase.regionserver.compactions.CompactionConfiguration;
import org.apache.hadoop.hbase.testclassification.MediumTests;
import org.apache.hadoop.hbase.util.Bytes;
import org.junit.AfterClass;
import org.junit.BeforeClass;
import org.junit.Test;
import org.junit.experimental.categories.Category;
@Category({MediumTests.class})
public class TestCleanupCompactedFileOnRegionClose {
private static HBaseTestingUtility util;
@BeforeClass
public static void beforeClass() throws Exception {
util = new HBaseTestingUtility();
util.getConfiguration().setInt(CompactionConfiguration.HBASE_HSTORE_COMPACTION_MIN_KEY,100);
util.getConfiguration().set("dfs.blocksize", "64000");
util.getConfiguration().set("dfs.namenode.fs-limits.min-block-size", "1024");
util.getConfiguration().set(TimeToLiveHFileCleaner.TTL_CONF_KEY,"0");
util.startMiniCluster(2);
}
@AfterClass
public static void afterclass() throws Exception {
util.shutdownMiniCluster();
}
@Test
public void testCleanupOnClose() throws Exception {
TableName tableName = TableName.valueOf("testCleanupOnClose");
String familyName = "f";
byte[] familyNameBytes = Bytes.toBytes(familyName);
util.createTable(tableName, familyName);
HBaseAdmin hBaseAdmin = util.getHBaseAdmin();
Table table = util.getConnection().getTable(tableName);
HRegionServer rs = util.getRSForFirstRegionInTable(tableName);
Region region = rs.getOnlineRegions(tableName).get(0);
int refSFCount = 4;
for (int i = 0; i < refSFCount; i++) {
for (int j = 0; j < refSFCount; j++) {
Put put = new Put(Bytes.toBytes(j));
put.addColumn(familyNameBytes, Bytes.toBytes(i), Bytes.toBytes(j));
table.put(put);
}
util.flush(tableName);
}
assertEquals(refSFCount, region.getStoreFileList(new byte[][]{familyNameBytes}).size());
//add a delete, to test wether we end up with an inconsistency post region close
Delete delete = new Delete(Bytes.toBytes(refSFCount-1));
table.delete(delete);
util.flush(tableName);
assertFalse(table.exists(new Get(Bytes.toBytes(refSFCount-1))));
//Create a scanner and keep it open to add references to StoreFileReaders
Scan scan = new Scan();
scan.setStopRow(Bytes.toBytes(refSFCount-2));
scan.setCaching(1);
ResultScanner scanner = table.getScanner(scan);
Result res = scanner.next();
assertNotNull(res);
assertEquals(refSFCount, res.getFamilyMap(familyNameBytes).size());
//Verify the references
int count = 0;
for (StoreFile sf : (Collection<StoreFile>)region.getStore(familyNameBytes).getStorefiles()) {
synchronized (sf) {
if (count < refSFCount) {
assertTrue(sf.getReader().isReferencedInReads());
} else {
assertFalse(sf.getReader().isReferencedInReads());
}
}
count++;
}
//Major compact to produce compacted storefiles that need to be cleaned up
util.compact(tableName, true);
assertEquals(1, region.getStoreFileList(new byte[][]{familyNameBytes}).size());
assertEquals(refSFCount+1,
((HStore)region.getStore(familyNameBytes)).getStoreEngine().getStoreFileManager()
.getCompactedfiles().size());
//close then open the region to determine wether compacted storefiles get cleaned up on close
hBaseAdmin.unassign(region.getRegionInfo().getRegionName(), false);
hBaseAdmin.assign(region.getRegionInfo().getRegionName());
util.waitUntilNoRegionsInTransition(10000);
assertFalse("Deleted row should not exist",
table.exists(new Get(Bytes.toBytes(refSFCount-1))));
rs = util.getRSForFirstRegionInTable(tableName);
region = rs.getOnlineRegions(tableName).get(0);
assertEquals(1, region.getStoreFileList(new byte[][]{familyNameBytes}).size());
assertNull(((HStore)region.getStore(familyNameBytes)).getStoreEngine().getStoreFileManager()
.getCompactedfiles());
}
@Test
public void testIOExceptionThrownOnClose() throws Exception {
byte[] filler = new byte[128000];
TableName tableName = TableName.valueOf("testIOExceptionThrownOnClose");
String familyName = "f";
byte[] familyNameBytes = Bytes.toBytes(familyName);
util.createTable(tableName, familyName);
Table table = util.getConnection().getTable(tableName);
HRegionServer rs = util.getRSForFirstRegionInTable(tableName);
Region region = rs.getOnlineRegions(tableName).get(0);
int refSFCount = 4;
for (int i = 0; i < refSFCount; i++) {
for (int j = 0; j < refSFCount; j++) {
Put put = new Put(Bytes.toBytes(j));
put.addColumn(familyNameBytes, Bytes.toBytes(i), filler);
table.put(put);
}
util.flush(tableName);
}
assertEquals(refSFCount, region.getStoreFileList(new byte[][]{familyNameBytes}).size());
HStore store = (HStore)region.getStore(familyNameBytes);
StoreFile hsf = region.getStore(familyNameBytes).getStorefiles().iterator().next();
long readPt = region.getReadpoint(IsolationLevel.READ_COMMITTED);
StoreFileScanner preadScanner = hsf.getReader().getStoreFileScanner(
false, true, false, readPt, 0, false);
preadScanner.seek(KeyValue.LOWESTKEY);
//Major compact to produce compacted storefiles that need to be cleaned up
util.compact(tableName, true);
assertNotNull(preadScanner.next());
store.closeAndArchiveCompactedFiles(true);
try {
assertNotNull(preadScanner.next());
fail("Expected IOException");
}catch (IOException ex) {
ex.printStackTrace();
}
}
}