HBASE-20704 Sometimes some compacted storefiles are not archived on region close
This commit is contained in:
parent
966e3751c2
commit
7a1b4d449d
|
@ -924,7 +924,7 @@ public class HStore implements Store, HeapSize, StoreConfigInformation, Propagat
|
||||||
storeEngine.getStoreFileManager().clearCompactedFiles();
|
storeEngine.getStoreFileManager().clearCompactedFiles();
|
||||||
// clear the compacted files
|
// clear the compacted files
|
||||||
if (CollectionUtils.isNotEmpty(compactedfiles)) {
|
if (CollectionUtils.isNotEmpty(compactedfiles)) {
|
||||||
removeCompactedfiles(compactedfiles);
|
removeCompactedfiles(compactedfiles, true);
|
||||||
}
|
}
|
||||||
if (!result.isEmpty()) {
|
if (!result.isEmpty()) {
|
||||||
// initialize the thread pool for closing store files in parallel.
|
// initialize the thread pool for closing store files in parallel.
|
||||||
|
@ -2533,6 +2533,11 @@ public class HStore implements Store, HeapSize, StoreConfigInformation, Propagat
|
||||||
* Closes and archives the compacted files under this store
|
* Closes and archives the compacted files under this store
|
||||||
*/
|
*/
|
||||||
public synchronized void closeAndArchiveCompactedFiles() throws IOException {
|
public synchronized void closeAndArchiveCompactedFiles() throws IOException {
|
||||||
|
closeAndArchiveCompactedFiles(false);
|
||||||
|
}
|
||||||
|
|
||||||
|
@VisibleForTesting
|
||||||
|
public synchronized void closeAndArchiveCompactedFiles(boolean storeClosing) throws IOException {
|
||||||
// ensure other threads do not attempt to archive the same files on close()
|
// ensure other threads do not attempt to archive the same files on close()
|
||||||
archiveLock.lock();
|
archiveLock.lock();
|
||||||
try {
|
try {
|
||||||
|
@ -2551,7 +2556,7 @@ public class HStore implements Store, HeapSize, StoreConfigInformation, Propagat
|
||||||
lock.readLock().unlock();
|
lock.readLock().unlock();
|
||||||
}
|
}
|
||||||
if (CollectionUtils.isNotEmpty(copyCompactedfiles)) {
|
if (CollectionUtils.isNotEmpty(copyCompactedfiles)) {
|
||||||
removeCompactedfiles(copyCompactedfiles);
|
removeCompactedfiles(copyCompactedfiles, storeClosing);
|
||||||
}
|
}
|
||||||
} finally {
|
} finally {
|
||||||
archiveLock.unlock();
|
archiveLock.unlock();
|
||||||
|
@ -2563,7 +2568,7 @@ public class HStore implements Store, HeapSize, StoreConfigInformation, Propagat
|
||||||
* @param compactedfiles The compacted files in this store that are not active in reads
|
* @param compactedfiles The compacted files in this store that are not active in reads
|
||||||
* @throws IOException
|
* @throws IOException
|
||||||
*/
|
*/
|
||||||
private void removeCompactedfiles(Collection<HStoreFile> compactedfiles)
|
private void removeCompactedfiles(Collection<HStoreFile> compactedfiles, boolean storeClosing)
|
||||||
throws IOException {
|
throws IOException {
|
||||||
final List<HStoreFile> filesToRemove = new ArrayList<>(compactedfiles.size());
|
final List<HStoreFile> filesToRemove = new ArrayList<>(compactedfiles.size());
|
||||||
for (final HStoreFile file : compactedfiles) {
|
for (final HStoreFile file : compactedfiles) {
|
||||||
|
@ -2575,11 +2580,29 @@ public class HStore implements Store, HeapSize, StoreConfigInformation, Propagat
|
||||||
filesToRemove.add(file);
|
filesToRemove.add(file);
|
||||||
continue;
|
continue;
|
||||||
}
|
}
|
||||||
if (file.isCompactedAway() && !file.isReferencedInReads()) {
|
|
||||||
|
//Compacted files in the list should always be marked compacted away. In the event
|
||||||
|
//they're contradicting in order to guarantee data consistency
|
||||||
|
//should we choose one and ignore the other?
|
||||||
|
if (storeClosing && !file.isCompactedAway()) {
|
||||||
|
String msg =
|
||||||
|
"Region closing but StoreFile is in compacted list but not compacted away: " +
|
||||||
|
file.getPath().getName();
|
||||||
|
throw new IllegalStateException(msg);
|
||||||
|
}
|
||||||
|
|
||||||
|
//If store is closing we're ignoring any references to keep things consistent
|
||||||
|
//and remove compacted storefiles from the region directory
|
||||||
|
if (file.isCompactedAway() && (!file.isReferencedInReads() || storeClosing)) {
|
||||||
|
if (storeClosing && file.isReferencedInReads()) {
|
||||||
|
LOG.debug("Region closing but StoreFile still has references: {}",
|
||||||
|
file.getPath().getName());
|
||||||
|
}
|
||||||
// Even if deleting fails we need not bother as any new scanners won't be
|
// Even if deleting fails we need not bother as any new scanners won't be
|
||||||
// able to use the compacted file as the status is already compactedAway
|
// able to use the compacted file as the status is already compactedAway
|
||||||
LOG.trace("Closing and archiving the file {}", file);
|
LOG.trace("Closing and archiving the file {}", file);
|
||||||
r.close(true);
|
r.close(true);
|
||||||
|
file.closeStreamReaders(true);
|
||||||
// Just close and return
|
// Just close and return
|
||||||
filesToRemove.add(file);
|
filesToRemove.add(file);
|
||||||
} else {
|
} else {
|
||||||
|
@ -2589,8 +2612,16 @@ public class HStore implements Store, HeapSize, StoreConfigInformation, Propagat
|
||||||
+ ", skipping for now.");
|
+ ", skipping for now.");
|
||||||
}
|
}
|
||||||
} catch (Exception e) {
|
} catch (Exception e) {
|
||||||
LOG.error("Exception while trying to close the compacted store file {}",
|
String msg = "Exception while trying to close the compacted store file " +
|
||||||
file.getPath(), e);
|
file.getPath().getName();
|
||||||
|
if (storeClosing) {
|
||||||
|
msg = "Store is closing. " + msg;
|
||||||
|
}
|
||||||
|
LOG.error(msg, e);
|
||||||
|
//if we get an exception let caller know so it can abort the server
|
||||||
|
if (storeClosing) {
|
||||||
|
throw new IOException(msg, e);
|
||||||
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -23,6 +23,8 @@ import java.util.Collections;
|
||||||
import java.util.Map;
|
import java.util.Map;
|
||||||
import java.util.Optional;
|
import java.util.Optional;
|
||||||
import java.util.OptionalLong;
|
import java.util.OptionalLong;
|
||||||
|
import java.util.Set;
|
||||||
|
import java.util.concurrent.ConcurrentHashMap;
|
||||||
import java.util.concurrent.atomic.AtomicBoolean;
|
import java.util.concurrent.atomic.AtomicBoolean;
|
||||||
import java.util.concurrent.atomic.AtomicInteger;
|
import java.util.concurrent.atomic.AtomicInteger;
|
||||||
|
|
||||||
|
@ -38,10 +40,14 @@ import org.apache.hadoop.hbase.io.hfile.CacheConfig;
|
||||||
import org.apache.hadoop.hbase.io.hfile.HFile;
|
import org.apache.hadoop.hbase.io.hfile.HFile;
|
||||||
import org.apache.hadoop.hbase.util.BloomFilterFactory;
|
import org.apache.hadoop.hbase.util.BloomFilterFactory;
|
||||||
import org.apache.hadoop.hbase.util.Bytes;
|
import org.apache.hadoop.hbase.util.Bytes;
|
||||||
|
|
||||||
import org.apache.yetus.audience.InterfaceAudience;
|
import org.apache.yetus.audience.InterfaceAudience;
|
||||||
|
|
||||||
import org.slf4j.Logger;
|
import org.slf4j.Logger;
|
||||||
import org.slf4j.LoggerFactory;
|
import org.slf4j.LoggerFactory;
|
||||||
import org.apache.hbase.thirdparty.com.google.common.annotations.VisibleForTesting;
|
import org.apache.hbase.thirdparty.com.google.common.annotations.VisibleForTesting;
|
||||||
|
import org.apache.hbase.thirdparty.com.google.common.base.Preconditions;
|
||||||
|
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* A Store data file. Stores usually have one or more of these files. They
|
* A Store data file. Stores usually have one or more of these files. They
|
||||||
|
@ -57,7 +63,7 @@ import org.apache.hbase.thirdparty.com.google.common.annotations.VisibleForTesti
|
||||||
* writer and a reader is that we write once but read a lot more.
|
* writer and a reader is that we write once but read a lot more.
|
||||||
*/
|
*/
|
||||||
@InterfaceAudience.Private
|
@InterfaceAudience.Private
|
||||||
public class HStoreFile implements StoreFile {
|
public class HStoreFile implements StoreFile, StoreFileReader.Listener {
|
||||||
|
|
||||||
private static final Logger LOG = LoggerFactory.getLogger(HStoreFile.class.getName());
|
private static final Logger LOG = LoggerFactory.getLogger(HStoreFile.class.getName());
|
||||||
|
|
||||||
|
@ -116,6 +122,9 @@ public class HStoreFile implements StoreFile {
|
||||||
// done.
|
// done.
|
||||||
private final AtomicInteger refCount = new AtomicInteger(0);
|
private final AtomicInteger refCount = new AtomicInteger(0);
|
||||||
|
|
||||||
|
// Set implementation must be of concurrent type
|
||||||
|
private final Set<StoreFileReader> streamReaders;
|
||||||
|
|
||||||
private final boolean noReadahead;
|
private final boolean noReadahead;
|
||||||
|
|
||||||
private final boolean primaryReplica;
|
private final boolean primaryReplica;
|
||||||
|
@ -219,6 +228,7 @@ public class HStoreFile implements StoreFile {
|
||||||
*/
|
*/
|
||||||
public HStoreFile(FileSystem fs, StoreFileInfo fileInfo, Configuration conf, CacheConfig cacheConf,
|
public HStoreFile(FileSystem fs, StoreFileInfo fileInfo, Configuration conf, CacheConfig cacheConf,
|
||||||
BloomType cfBloomType, boolean primaryReplica) {
|
BloomType cfBloomType, boolean primaryReplica) {
|
||||||
|
this.streamReaders = ConcurrentHashMap.newKeySet();
|
||||||
this.fs = fs;
|
this.fs = fs;
|
||||||
this.fileInfo = fileInfo;
|
this.fileInfo = fileInfo;
|
||||||
this.cacheConf = cacheConf;
|
this.cacheConf = cacheConf;
|
||||||
|
@ -502,8 +512,13 @@ public class HStoreFile implements StoreFile {
|
||||||
public StoreFileScanner getStreamScanner(boolean canUseDropBehind, boolean cacheBlocks,
|
public StoreFileScanner getStreamScanner(boolean canUseDropBehind, boolean cacheBlocks,
|
||||||
boolean isCompaction, long readPt, long scannerOrder, boolean canOptimizeForNonNullColumn)
|
boolean isCompaction, long readPt, long scannerOrder, boolean canOptimizeForNonNullColumn)
|
||||||
throws IOException {
|
throws IOException {
|
||||||
return createStreamReader(canUseDropBehind).getStoreFileScanner(cacheBlocks, false,
|
StoreFileReader reader = createStreamReader(canUseDropBehind);
|
||||||
|
reader.setListener(this);
|
||||||
|
StoreFileScanner sfScanner = reader.getStoreFileScanner(cacheBlocks, false,
|
||||||
isCompaction, readPt, scannerOrder, canOptimizeForNonNullColumn);
|
isCompaction, readPt, scannerOrder, canOptimizeForNonNullColumn);
|
||||||
|
//Add reader once the scanner is created
|
||||||
|
streamReaders.add(reader);
|
||||||
|
return sfScanner;
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
|
@ -523,6 +538,19 @@ public class HStoreFile implements StoreFile {
|
||||||
this.reader.close(evictOnClose);
|
this.reader.close(evictOnClose);
|
||||||
this.reader = null;
|
this.reader = null;
|
||||||
}
|
}
|
||||||
|
closeStreamReaders(evictOnClose);
|
||||||
|
}
|
||||||
|
|
||||||
|
public void closeStreamReaders(boolean evictOnClose) throws IOException {
|
||||||
|
synchronized (this) {
|
||||||
|
for (StoreFileReader entry : streamReaders) {
|
||||||
|
//closing the reader will remove itself from streamReaders thanks to the Listener
|
||||||
|
entry.close(evictOnClose);
|
||||||
|
}
|
||||||
|
int size = streamReaders.size();
|
||||||
|
Preconditions.checkState(size == 0,
|
||||||
|
"There are still streamReaders post close: " + size);
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
|
@ -589,4 +617,9 @@ public class HStoreFile implements StoreFile {
|
||||||
TimeRange tr = getReader().timeRange;
|
TimeRange tr = getReader().timeRange;
|
||||||
return tr != null ? OptionalLong.of(tr.getMax()) : OptionalLong.empty();
|
return tr != null ? OptionalLong.of(tr.getMax()) : OptionalLong.empty();
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public void storeFileReaderClosed(StoreFileReader reader) {
|
||||||
|
streamReaders.remove(reader);
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -85,6 +85,10 @@ public class StoreFileReader {
|
||||||
@VisibleForTesting
|
@VisibleForTesting
|
||||||
final boolean shared;
|
final boolean shared;
|
||||||
|
|
||||||
|
private volatile Listener listener;
|
||||||
|
|
||||||
|
private boolean closed = false;
|
||||||
|
|
||||||
private StoreFileReader(HFile.Reader reader, AtomicInteger refCount, boolean shared) {
|
private StoreFileReader(HFile.Reader reader, AtomicInteger refCount, boolean shared) {
|
||||||
this.reader = reader;
|
this.reader = reader;
|
||||||
bloomFilterType = BloomType.NONE;
|
bloomFilterType = BloomType.NONE;
|
||||||
|
@ -209,7 +213,16 @@ public class StoreFileReader {
|
||||||
}
|
}
|
||||||
|
|
||||||
public void close(boolean evictOnClose) throws IOException {
|
public void close(boolean evictOnClose) throws IOException {
|
||||||
reader.close(evictOnClose);
|
synchronized (this) {
|
||||||
|
if (closed) {
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
reader.close(evictOnClose);
|
||||||
|
closed = true;
|
||||||
|
}
|
||||||
|
if (listener != null) {
|
||||||
|
listener.storeFileReaderClosed(this);
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
|
@ -644,4 +657,12 @@ public class StoreFileReader {
|
||||||
void setSkipResetSeqId(boolean skipResetSeqId) {
|
void setSkipResetSeqId(boolean skipResetSeqId) {
|
||||||
this.skipResetSeqId = skipResetSeqId;
|
this.skipResetSeqId = skipResetSeqId;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
public void setListener(Listener listener) {
|
||||||
|
this.listener = listener;
|
||||||
|
}
|
||||||
|
|
||||||
|
public interface Listener {
|
||||||
|
void storeFileReaderClosed(StoreFileReader reader);
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -0,0 +1,210 @@
|
||||||
|
/*
|
||||||
|
*
|
||||||
|
* Licensed to the Apache Software Foundation (ASF) under one
|
||||||
|
* or more contributor license agreements. See the NOTICE file
|
||||||
|
* distributed with this work for additional information
|
||||||
|
* regarding copyright ownership. The ASF licenses this file
|
||||||
|
* to you under the Apache License, Version 2.0 (the
|
||||||
|
* "License"); you may not use this file except in compliance
|
||||||
|
* with the License. You may obtain a copy of the License at
|
||||||
|
*
|
||||||
|
* http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
*
|
||||||
|
* Unless required by applicable law or agreed to in writing, software
|
||||||
|
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||||
|
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||||
|
* See the License for the specific language governing permissions and
|
||||||
|
* limitations under the License.
|
||||||
|
*/
|
||||||
|
package org.apache.hadoop.hbase.regionserver;
|
||||||
|
|
||||||
|
import static org.junit.Assert.assertEquals;
|
||||||
|
import static org.junit.Assert.assertFalse;
|
||||||
|
import static org.junit.Assert.assertNotNull;
|
||||||
|
import static org.junit.Assert.assertTrue;
|
||||||
|
import static org.junit.Assert.fail;
|
||||||
|
|
||||||
|
import java.io.IOException;
|
||||||
|
import java.util.Collection;
|
||||||
|
|
||||||
|
import org.apache.hadoop.hbase.HBaseClassTestRule;
|
||||||
|
import org.apache.hadoop.hbase.HBaseTestingUtility;
|
||||||
|
import org.apache.hadoop.hbase.KeyValue;
|
||||||
|
import org.apache.hadoop.hbase.TableName;
|
||||||
|
import org.apache.hadoop.hbase.client.Delete;
|
||||||
|
import org.apache.hadoop.hbase.client.Get;
|
||||||
|
import org.apache.hadoop.hbase.client.HBaseAdmin;
|
||||||
|
import org.apache.hadoop.hbase.client.IsolationLevel;
|
||||||
|
import org.apache.hadoop.hbase.client.Put;
|
||||||
|
import org.apache.hadoop.hbase.client.Result;
|
||||||
|
import org.apache.hadoop.hbase.client.ResultScanner;
|
||||||
|
import org.apache.hadoop.hbase.client.Scan;
|
||||||
|
import org.apache.hadoop.hbase.client.Table;
|
||||||
|
import org.apache.hadoop.hbase.master.cleaner.TimeToLiveHFileCleaner;
|
||||||
|
import org.apache.hadoop.hbase.regionserver.compactions.CompactionConfiguration;
|
||||||
|
import org.apache.hadoop.hbase.testclassification.MediumTests;
|
||||||
|
import org.apache.hadoop.hbase.util.Bytes;
|
||||||
|
|
||||||
|
import org.junit.AfterClass;
|
||||||
|
import org.junit.BeforeClass;
|
||||||
|
import org.junit.ClassRule;
|
||||||
|
import org.junit.Test;
|
||||||
|
import org.junit.experimental.categories.Category;
|
||||||
|
|
||||||
|
@Category({MediumTests.class})
|
||||||
|
public class TestCleanupCompactedFileOnRegionClose {
|
||||||
|
|
||||||
|
@ClassRule
|
||||||
|
public static final HBaseClassTestRule CLASS_RULE =
|
||||||
|
HBaseClassTestRule.forClass(TestCleanupCompactedFileOnRegionClose.class);
|
||||||
|
|
||||||
|
private static HBaseTestingUtility util;
|
||||||
|
|
||||||
|
@BeforeClass
|
||||||
|
public static void beforeClass() throws Exception {
|
||||||
|
util = new HBaseTestingUtility();
|
||||||
|
util.getConfiguration().setInt(CompactionConfiguration.HBASE_HSTORE_COMPACTION_MIN_KEY,100);
|
||||||
|
util.getConfiguration().set("dfs.blocksize", "64000");
|
||||||
|
util.getConfiguration().set("dfs.namenode.fs-limits.min-block-size", "1024");
|
||||||
|
util.getConfiguration().set(TimeToLiveHFileCleaner.TTL_CONF_KEY,"0");
|
||||||
|
util.startMiniCluster(2);
|
||||||
|
}
|
||||||
|
|
||||||
|
@AfterClass
|
||||||
|
public static void afterclass() throws Exception {
|
||||||
|
util.shutdownMiniCluster();
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void testCleanupOnClose() throws Exception {
|
||||||
|
TableName tableName = TableName.valueOf("testCleanupOnClose");
|
||||||
|
String familyName = "f";
|
||||||
|
byte[] familyNameBytes = Bytes.toBytes(familyName);
|
||||||
|
util.createTable(tableName, familyName);
|
||||||
|
|
||||||
|
HBaseAdmin hBaseAdmin = util.getHBaseAdmin();
|
||||||
|
Table table = util.getConnection().getTable(tableName);
|
||||||
|
|
||||||
|
HRegionServer rs = util.getRSForFirstRegionInTable(tableName);
|
||||||
|
Region region = rs.getRegions(tableName).get(0);
|
||||||
|
|
||||||
|
int refSFCount = 4;
|
||||||
|
for (int i = 0; i < refSFCount; i++) {
|
||||||
|
for (int j = 0; j < refSFCount; j++) {
|
||||||
|
Put put = new Put(Bytes.toBytes(j));
|
||||||
|
put.addColumn(familyNameBytes, Bytes.toBytes(i), Bytes.toBytes(j));
|
||||||
|
table.put(put);
|
||||||
|
}
|
||||||
|
util.flush(tableName);
|
||||||
|
}
|
||||||
|
assertEquals(refSFCount, region.getStoreFileList(new byte[][]{familyNameBytes}).size());
|
||||||
|
|
||||||
|
//add a delete, to test wether we end up with an inconsistency post region close
|
||||||
|
Delete delete = new Delete(Bytes.toBytes(refSFCount-1));
|
||||||
|
table.delete(delete);
|
||||||
|
util.flush(tableName);
|
||||||
|
assertFalse(table.exists(new Get(Bytes.toBytes(refSFCount-1))));
|
||||||
|
|
||||||
|
//Create a scanner and keep it open to add references to StoreFileReaders
|
||||||
|
Scan scan = new Scan();
|
||||||
|
scan.setStopRow(Bytes.toBytes(refSFCount-2));
|
||||||
|
scan.setCaching(1);
|
||||||
|
ResultScanner scanner = table.getScanner(scan);
|
||||||
|
Result res = scanner.next();
|
||||||
|
assertNotNull(res);
|
||||||
|
assertEquals(refSFCount, res.getFamilyMap(familyNameBytes).size());
|
||||||
|
|
||||||
|
|
||||||
|
//Verify the references
|
||||||
|
int count = 0;
|
||||||
|
for (HStoreFile sf : (Collection<HStoreFile>)region.getStore(familyNameBytes).getStorefiles()) {
|
||||||
|
synchronized (sf) {
|
||||||
|
if (count < refSFCount) {
|
||||||
|
assertTrue(sf.isReferencedInReads());
|
||||||
|
} else {
|
||||||
|
assertFalse(sf.isReferencedInReads());
|
||||||
|
}
|
||||||
|
}
|
||||||
|
count++;
|
||||||
|
}
|
||||||
|
|
||||||
|
//Major compact to produce compacted storefiles that need to be cleaned up
|
||||||
|
util.compact(tableName, true);
|
||||||
|
assertEquals(1, region.getStoreFileList(new byte[][]{familyNameBytes}).size());
|
||||||
|
assertEquals(refSFCount+1,
|
||||||
|
((HStore)region.getStore(familyNameBytes)).getStoreEngine().getStoreFileManager()
|
||||||
|
.getCompactedfiles().size());
|
||||||
|
|
||||||
|
//close then open the region to determine wether compacted storefiles get cleaned up on close
|
||||||
|
hBaseAdmin.unassign(region.getRegionInfo().getRegionName(), false);
|
||||||
|
hBaseAdmin.assign(region.getRegionInfo().getRegionName());
|
||||||
|
util.waitUntilNoRegionsInTransition(10000);
|
||||||
|
|
||||||
|
|
||||||
|
assertFalse("Deleted row should not exist",
|
||||||
|
table.exists(new Get(Bytes.toBytes(refSFCount-1))));
|
||||||
|
|
||||||
|
rs = util.getRSForFirstRegionInTable(tableName);
|
||||||
|
region = rs.getRegions(tableName).get(0);
|
||||||
|
assertEquals(1, region.getStoreFileList(new byte[][]{familyNameBytes}).size());
|
||||||
|
assertEquals(0,
|
||||||
|
((HStore)region.getStore(familyNameBytes)).getStoreEngine().getStoreFileManager()
|
||||||
|
.getCompactedfiles().size());
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void testIOExceptionThrownOnClose() throws Exception {
|
||||||
|
byte[] filler = new byte[128000];
|
||||||
|
TableName tableName = TableName.valueOf("testIOExceptionThrownOnClose");
|
||||||
|
String familyName = "f";
|
||||||
|
byte[] familyNameBytes = Bytes.toBytes(familyName);
|
||||||
|
util.createTable(tableName, familyName);
|
||||||
|
|
||||||
|
Table table = util.getConnection().getTable(tableName);
|
||||||
|
|
||||||
|
HRegionServer rs = util.getRSForFirstRegionInTable(tableName);
|
||||||
|
Region region = rs.getRegions(tableName).get(0);
|
||||||
|
|
||||||
|
int refSFCount = 4;
|
||||||
|
for (int i = 0; i < refSFCount; i++) {
|
||||||
|
for (int j = 0; j < refSFCount; j++) {
|
||||||
|
Put put = new Put(Bytes.toBytes(j));
|
||||||
|
put.addColumn(familyNameBytes, Bytes.toBytes(i), filler);
|
||||||
|
table.put(put);
|
||||||
|
}
|
||||||
|
util.flush(tableName);
|
||||||
|
}
|
||||||
|
assertEquals(refSFCount, region.getStoreFileList(new byte[][]{familyNameBytes}).size());
|
||||||
|
|
||||||
|
HStore store = ((HRegion) region).getStore(familyNameBytes);
|
||||||
|
HStoreFile hsf = ((Collection<HStoreFile>)region.getStore(familyNameBytes).getStorefiles())
|
||||||
|
.iterator().next();
|
||||||
|
long readPt = ((HRegion)region).getReadPoint(IsolationLevel.READ_COMMITTED);
|
||||||
|
StoreFileScanner preadScanner = hsf.getPreadScanner(false, readPt, 0, false);
|
||||||
|
StoreFileScanner streamScanner =
|
||||||
|
hsf.getStreamScanner(false, false, false, readPt, 0, false);
|
||||||
|
preadScanner.seek(KeyValue.LOWESTKEY);
|
||||||
|
streamScanner.seek(KeyValue.LOWESTKEY);
|
||||||
|
|
||||||
|
//Major compact to produce compacted storefiles that need to be cleaned up
|
||||||
|
util.compact(tableName, true);
|
||||||
|
assertNotNull(preadScanner.next());
|
||||||
|
assertNotNull(streamScanner.next());
|
||||||
|
store.closeAndArchiveCompactedFiles(true);
|
||||||
|
|
||||||
|
try {
|
||||||
|
assertNotNull(preadScanner.next());
|
||||||
|
fail("Expected IOException");
|
||||||
|
}catch (IOException ex) {
|
||||||
|
ex.printStackTrace();
|
||||||
|
}
|
||||||
|
|
||||||
|
//Wait a bit for file to be remove from
|
||||||
|
try {
|
||||||
|
assertNotNull(streamScanner.next());
|
||||||
|
fail("Expected IOException");
|
||||||
|
} catch (IOException ex) {
|
||||||
|
ex.printStackTrace();
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
Loading…
Reference in New Issue