HBASE-20724 Sometimes some compacted storefiles are still opened after region failover

This commit is contained in:
Guanghao Zhang 2019-02-19 20:50:37 +08:00
parent ea0675eb96
commit e412e036e1
15 changed files with 388 additions and 238 deletions

View File

@ -35,6 +35,7 @@ import java.util.Map;
import java.util.Map.Entry;
import java.util.NavigableSet;
import java.util.Optional;
import java.util.Set;
import java.util.concurrent.Callable;
import java.util.concurrent.TimeUnit;
import java.util.function.Function;
@ -163,6 +164,7 @@ import org.apache.hadoop.hbase.shaded.protobuf.generated.HBaseProtos.RegionInfo;
import org.apache.hadoop.hbase.shaded.protobuf.generated.HBaseProtos.RegionSpecifier;
import org.apache.hadoop.hbase.shaded.protobuf.generated.HBaseProtos.RegionSpecifier.RegionSpecifierType;
import org.apache.hadoop.hbase.shaded.protobuf.generated.HBaseProtos.TableSchema;
import org.apache.hadoop.hbase.shaded.protobuf.generated.HFileProtos;
import org.apache.hadoop.hbase.shaded.protobuf.generated.LockServiceProtos;
import org.apache.hadoop.hbase.shaded.protobuf.generated.MapReduceProtos;
import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos;
@ -3265,4 +3267,27 @@ public final class ProtobufUtil {
.setTo(timeRange.getMax())
.build();
}
public static byte[] toCompactionEventTrackerBytes(Set<String> storeFiles) {
HFileProtos.CompactionEventTracker.Builder builder =
HFileProtos.CompactionEventTracker.newBuilder();
storeFiles.forEach(sf -> builder.addCompactedStoreFile(ByteString.copyFromUtf8(sf)));
return ProtobufUtil.prependPBMagic(builder.build().toByteArray());
}
public static Set<String> toCompactedStoreFiles(byte[] bytes) throws IOException {
if (bytes != null && ProtobufUtil.isPBMagicPrefix(bytes)) {
int pbLen = ProtobufUtil.lengthOfPBMagic();
HFileProtos.CompactionEventTracker.Builder builder =
HFileProtos.CompactionEventTracker.newBuilder();
ProtobufUtil.mergeFrom(builder, bytes, pbLen, bytes.length - pbLen);
HFileProtos.CompactionEventTracker compactionEventTracker = builder.build();
List<ByteString> compactedStoreFiles = compactionEventTracker.getCompactedStoreFileList();
if (compactedStoreFiles != null && compactedStoreFiles.size() != 0) {
return compactedStoreFiles.stream().map(ByteString::toStringUtf8)
.collect(Collectors.toSet());
}
}
return Collections.emptySet();
}
}

View File

@ -27,6 +27,10 @@ option optimize_for = SPEED;
import "HBase.proto";
message CompactionEventTracker {
repeated bytes compacted_store_file = 1;
}
// Map of name/values
message FileInfoProto {
repeated BytesBytesPair map_entry = 1;

View File

@ -20,6 +20,7 @@ package org.apache.hadoop.hbase.regionserver;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.List;
import org.apache.hadoop.fs.Path;
import org.apache.yetus.audience.InterfaceAudience;
@ -62,18 +63,24 @@ public abstract class AbstractMultiFileWriter implements CellSink, ShipperListen
* comments in HBASE-15400 for more details.
*/
public List<Path> commitWriters(long maxSeqId, boolean majorCompaction) throws IOException {
return commitWriters(maxSeqId, majorCompaction, Collections.EMPTY_SET);
}
public List<Path> commitWriters(long maxSeqId, boolean majorCompaction,
Collection<HStoreFile> storeFiles) throws IOException {
preCommitWriters();
Collection<StoreFileWriter> writers = this.writers();
if (LOG.isDebugEnabled()) {
LOG.debug("Commit " + writers.size() + " writers, maxSeqId=" + maxSeqId
+ ", majorCompaction=" + majorCompaction);
LOG.debug(
"Commit " + writers.size() + " writers, maxSeqId=" + maxSeqId + ", majorCompaction=" +
majorCompaction);
}
List<Path> paths = new ArrayList<>();
for (StoreFileWriter writer : writers) {
if (writer == null) {
continue;
}
writer.appendMetadata(maxSeqId, majorCompaction);
writer.appendMetadata(maxSeqId, majorCompaction, storeFiles);
preCloseWriter(writer);
paths.add(writer.getPath());
writer.close();

View File

@ -574,6 +574,7 @@ public class HStore implements Store, HeapSize, StoreConfigInformation, Propagat
totalValidStoreFile++;
}
Set<String> compactedStoreFiles = new HashSet<>();
ArrayList<HStoreFile> results = new ArrayList<>(files.size());
IOException ioe = null;
try {
@ -583,6 +584,7 @@ public class HStore implements Store, HeapSize, StoreConfigInformation, Propagat
if (storeFile != null) {
LOG.debug("loaded {}", storeFile);
results.add(storeFile);
compactedStoreFiles.addAll(storeFile.getCompactedStoreFiles());
}
} catch (InterruptedException e) {
if (ioe == null) ioe = new InterruptedIOException(e.getMessage());
@ -609,6 +611,21 @@ public class HStore implements Store, HeapSize, StoreConfigInformation, Propagat
throw ioe;
}
// Remove the compacted files from result
List<HStoreFile> filesToRemove = new ArrayList<>(compactedStoreFiles.size());
for (HStoreFile storeFile : results) {
if (compactedStoreFiles.contains(storeFile.getPath().getName())) {
LOG.warn("Clearing the compacted storefile {} from this store", storeFile);
storeFile.getReader().close(true);
filesToRemove.add(storeFile);
}
}
results.removeAll(filesToRemove);
if (!filesToRemove.isEmpty() && this.isPrimaryReplicaStore()) {
LOG.debug("Moving the files {} to archive", filesToRemove);
this.fs.removeStoreFiles(this.getColumnFamilyDescriptor().getNameAsString(), filesToRemove);
}
return results;
}
@ -930,7 +947,7 @@ public class HStore implements Store, HeapSize, StoreConfigInformation, Propagat
storeEngine.getStoreFileManager().clearCompactedFiles();
// clear the compacted files
if (CollectionUtils.isNotEmpty(compactedfiles)) {
removeCompactedfiles(compactedfiles, true);
removeCompactedfiles(compactedfiles);
}
if (!result.isEmpty()) {
// initialize the thread pool for closing store files in parallel.
@ -1115,7 +1132,8 @@ public class HStore implements Store, HeapSize, StoreConfigInformation, Propagat
.withMaxKeyCount(maxKeyCount)
.withFavoredNodes(favoredNodes)
.withFileContext(hFileContext)
.withShouldDropCacheBehind(shouldDropBehind);
.withShouldDropCacheBehind(shouldDropBehind)
.withCompactedFilesSupplier(this::getCompactedFiles);
return builder.build();
}
@ -2528,11 +2546,6 @@ public class HStore implements Store, HeapSize, StoreConfigInformation, Propagat
* Closes and archives the compacted files under this store
*/
public synchronized void closeAndArchiveCompactedFiles() throws IOException {
closeAndArchiveCompactedFiles(false);
}
@VisibleForTesting
public synchronized void closeAndArchiveCompactedFiles(boolean storeClosing) throws IOException {
// ensure other threads do not attempt to archive the same files on close()
archiveLock.lock();
try {
@ -2551,7 +2564,7 @@ public class HStore implements Store, HeapSize, StoreConfigInformation, Propagat
lock.readLock().unlock();
}
if (CollectionUtils.isNotEmpty(copyCompactedfiles)) {
removeCompactedfiles(copyCompactedfiles, storeClosing);
removeCompactedfiles(copyCompactedfiles);
}
} finally {
archiveLock.unlock();
@ -2563,7 +2576,7 @@ public class HStore implements Store, HeapSize, StoreConfigInformation, Propagat
* @param compactedfiles The compacted files in this store that are not active in reads
* @throws IOException
*/
private void removeCompactedfiles(Collection<HStoreFile> compactedfiles, boolean storeClosing)
private void removeCompactedfiles(Collection<HStoreFile> compactedfiles)
throws IOException {
final List<HStoreFile> filesToRemove = new ArrayList<>(compactedfiles.size());
for (final HStoreFile file : compactedfiles) {
@ -2576,28 +2589,11 @@ public class HStore implements Store, HeapSize, StoreConfigInformation, Propagat
continue;
}
//Compacted files in the list should always be marked compacted away. In the event
//they're contradicting in order to guarantee data consistency
//should we choose one and ignore the other?
if (storeClosing && !file.isCompactedAway()) {
String msg =
"Region closing but StoreFile is in compacted list but not compacted away: " +
file.getPath();
throw new IllegalStateException(msg);
}
//If store is closing we're ignoring any references to keep things consistent
//and remove compacted storefiles from the region directory
if (file.isCompactedAway() && (!file.isReferencedInReads() || storeClosing)) {
if (storeClosing && file.isReferencedInReads()) {
LOG.warn("Region closing but StoreFile still has references: file={}, refCount={}",
file.getPath(), r.getRefCount());
}
if (file.isCompactedAway() && !file.isReferencedInReads()) {
// Even if deleting fails we need not bother as any new scanners won't be
// able to use the compacted file as the status is already compactedAway
LOG.trace("Closing and archiving the file {}", file);
r.close(true);
file.closeStreamReaders(true);
// Just close and return
filesToRemove.add(file);
} else {
@ -2607,16 +2603,8 @@ public class HStore implements Store, HeapSize, StoreConfigInformation, Propagat
+ ", refCount=" + r.getRefCount() + ", skipping for now.");
}
} catch (Exception e) {
String msg = "Exception while trying to close the compacted store file " +
file.getPath();
if (storeClosing) {
msg = "Store is closing. " + msg;
}
LOG.error(msg, e);
//if we get an exception let caller know so it can abort the server
if (storeClosing) {
throw new IOException(msg, e);
}
LOG.error("Exception while trying to close the compacted store file {}", file.getPath(),
e);
}
}
}
@ -2672,6 +2660,7 @@ public class HStore implements Store, HeapSize, StoreConfigInformation, Propagat
}
}
@Override
public int getCurrentParallelPutCount() {
return currentParallelPutCount.get();
}

View File

@ -20,11 +20,11 @@ package org.apache.hadoop.hbase.regionserver;
import java.io.IOException;
import java.util.Collections;
import java.util.HashSet;
import java.util.Map;
import java.util.Optional;
import java.util.OptionalLong;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
@ -40,14 +40,13 @@ import org.apache.hadoop.hbase.io.hfile.CacheConfig;
import org.apache.hadoop.hbase.io.hfile.HFile;
import org.apache.hadoop.hbase.util.BloomFilterFactory;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.yetus.audience.InterfaceAudience;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.apache.hbase.thirdparty.com.google.common.annotations.VisibleForTesting;
import org.apache.hbase.thirdparty.com.google.common.base.Preconditions;
import org.apache.hbase.thirdparty.com.google.common.annotations.VisibleForTesting;
import org.apache.hadoop.hbase.shaded.protobuf.ProtobufUtil;
/**
* A Store data file. Stores usually have one or more of these files. They
@ -63,7 +62,7 @@ import org.apache.hbase.thirdparty.com.google.common.base.Preconditions;
* writer and a reader is that we write once but read a lot more.
*/
@InterfaceAudience.Private
public class HStoreFile implements StoreFile, StoreFileReader.Listener {
public class HStoreFile implements StoreFile {
private static final Logger LOG = LoggerFactory.getLogger(HStoreFile.class.getName());
@ -83,6 +82,11 @@ public class HStoreFile implements StoreFile, StoreFileReader.Listener {
public static final byte[] EXCLUDE_FROM_MINOR_COMPACTION_KEY =
Bytes.toBytes("EXCLUDE_FROM_MINOR_COMPACTION");
/**
* Key for compaction event which contains the compacted storefiles in FileInfo
*/
public static final byte[] COMPACTION_EVENT_KEY = Bytes.toBytes("COMPACTION_EVENT_KEY");
/** Bloom filter Type in FileInfo */
public static final byte[] BLOOM_FILTER_TYPE_KEY = Bytes.toBytes("BLOOM_FILTER_TYPE");
@ -125,10 +129,6 @@ public class HStoreFile implements StoreFile, StoreFileReader.Listener {
// done.
private final AtomicInteger refCount = new AtomicInteger(0);
// Set implementation must be of concurrent type
@VisibleForTesting
final Set<StoreFileReader> streamReaders;
private final boolean noReadahead;
private final boolean primaryReplica;
@ -183,6 +183,9 @@ public class HStoreFile implements StoreFile, StoreFileReader.Listener {
// It's set whenever you get a Reader.
private boolean excludeFromMinorCompaction = false;
// This file was product of these compacted store files
private final Set<String> compactedStoreFiles = new HashSet<>();
/**
* Map of the metadata entries in the corresponding HFile. Populated when Reader is opened
* after which it is not modified again.
@ -232,7 +235,6 @@ public class HStoreFile implements StoreFile, StoreFileReader.Listener {
*/
public HStoreFile(FileSystem fs, StoreFileInfo fileInfo, Configuration conf, CacheConfig cacheConf,
BloomType cfBloomType, boolean primaryReplica) {
this.streamReaders = ConcurrentHashMap.newKeySet();
this.fs = fs;
this.fileInfo = fileInfo;
this.cacheConf = cacheConf;
@ -359,7 +361,6 @@ public class HStoreFile implements StoreFile, StoreFileReader.Listener {
/**
* Opens reader on this store file. Called by Constructor.
* @throws IOException
* @see #closeStoreFile(boolean)
*/
private void open() throws IOException {
@ -464,6 +465,14 @@ public class HStoreFile implements StoreFile, StoreFileReader.Listener {
"proceeding without", e);
this.reader.timeRange = null;
}
try {
byte[] data = metadataMap.get(COMPACTION_EVENT_KEY);
this.compactedStoreFiles.addAll(ProtobufUtil.toCompactedStoreFiles(data));
} catch (IOException e) {
LOG.error("Error reading compacted storefiles from meta data", e);
}
// initialize so we can reuse them after reader closed.
firstKey = reader.getFirstKey();
lastKey = reader.getLastKey();
@ -516,13 +525,9 @@ public class HStoreFile implements StoreFile, StoreFileReader.Listener {
public StoreFileScanner getStreamScanner(boolean canUseDropBehind, boolean cacheBlocks,
boolean isCompaction, long readPt, long scannerOrder, boolean canOptimizeForNonNullColumn)
throws IOException {
StoreFileReader reader = createStreamReader(canUseDropBehind);
reader.setListener(this);
StoreFileScanner sfScanner = reader.getStoreFileScanner(cacheBlocks, false,
isCompaction, readPt, scannerOrder, canOptimizeForNonNullColumn);
//Add reader once the scanner is created
streamReaders.add(reader);
return sfScanner;
return createStreamReader(canUseDropBehind)
.getStoreFileScanner(cacheBlocks, false, isCompaction, readPt, scannerOrder,
canOptimizeForNonNullColumn);
}
/**
@ -542,19 +547,6 @@ public class HStoreFile implements StoreFile, StoreFileReader.Listener {
this.reader.close(evictOnClose);
this.reader = null;
}
closeStreamReaders(evictOnClose);
}
public void closeStreamReaders(boolean evictOnClose) throws IOException {
synchronized (this) {
for (StoreFileReader entry : streamReaders) {
//closing the reader will remove itself from streamReaders thanks to the Listener
entry.close(evictOnClose);
}
int size = streamReaders.size();
Preconditions.checkState(size == 0,
"There are still streamReaders post close: " + size);
}
}
/**
@ -622,8 +614,7 @@ public class HStoreFile implements StoreFile, StoreFileReader.Listener {
return tr != null ? OptionalLong.of(tr.getMax()) : OptionalLong.empty();
}
@Override
public void storeFileReaderClosed(StoreFileReader reader) {
streamReaders.remove(reader);
Set<String> getCompactedStoreFiles() {
return Collections.unmodifiableSet(this.compactedStoreFiles);
}
}

View File

@ -87,10 +87,6 @@ public class StoreFileReader {
@VisibleForTesting
final boolean shared;
private volatile Listener listener;
private boolean closed = false;
private StoreFileReader(HFile.Reader reader, AtomicInteger refCount, boolean shared) {
this.reader = reader;
bloomFilterType = BloomType.NONE;
@ -184,9 +180,6 @@ public class StoreFileReader {
if (!shared) {
try {
reader.close(false);
if (this.listener != null) {
this.listener.storeFileReaderClosed(this);
}
} catch (IOException e) {
LOG.warn("failed to close stream reader", e);
}
@ -227,16 +220,7 @@ public class StoreFileReader {
}
public void close(boolean evictOnClose) throws IOException {
synchronized (this) {
if (closed) {
return;
}
reader.close(evictOnClose);
closed = true;
}
if (listener != null) {
listener.storeFileReaderClosed(this);
}
}
/**
@ -709,14 +693,6 @@ public class StoreFileReader {
this.skipResetSeqId = skipResetSeqId;
}
public void setListener(Listener listener) {
this.listener = listener;
}
public interface Listener {
void storeFileReaderClosed(StoreFileReader reader);
}
public int getPrefixLength() {
return prefixLength;
}

View File

@ -19,6 +19,7 @@ package org.apache.hadoop.hbase.regionserver;
import static org.apache.hadoop.hbase.regionserver.HStoreFile.BLOOM_FILTER_PARAM_KEY;
import static org.apache.hadoop.hbase.regionserver.HStoreFile.BLOOM_FILTER_TYPE_KEY;
import static org.apache.hadoop.hbase.regionserver.HStoreFile.COMPACTION_EVENT_KEY;
import static org.apache.hadoop.hbase.regionserver.HStoreFile.DELETE_FAMILY_COUNT;
import static org.apache.hadoop.hbase.regionserver.HStoreFile.EARLIEST_PUT_TS;
import static org.apache.hadoop.hbase.regionserver.HStoreFile.MAJOR_COMPACTION_KEY;
@ -28,8 +29,14 @@ import static org.apache.hadoop.hbase.regionserver.HStoreFile.TIMERANGE_KEY;
import java.io.IOException;
import java.net.InetSocketAddress;
import java.util.Collection;
import java.util.Collections;
import java.util.HashSet;
import java.util.Set;
import java.util.UUID;
import java.util.function.Supplier;
import java.util.regex.Pattern;
import java.util.stream.Collectors;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
@ -55,8 +62,11 @@ import org.apache.hadoop.hbase.util.RowPrefixFixedLengthBloomContext;
import org.apache.yetus.audience.InterfaceAudience;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.apache.hbase.thirdparty.com.google.common.base.Preconditions;
import org.apache.hadoop.hbase.shaded.protobuf.ProtobufUtil;
/**
* A StoreFile writer. Use this to read/write HBase Store Files. It is package
* local because it is an implementation detail of the HBase regionserver.
@ -74,11 +84,13 @@ public class StoreFileWriter implements CellSink, ShipperListener {
private BloomContext bloomContext = null;
private BloomContext deleteFamilyBloomContext = null;
private final TimeRangeTracker timeRangeTracker;
private final Supplier<Collection<HStoreFile>> compactedFilesSupplier;
protected HFile.Writer writer;
/**
* Creates an HFile.Writer that also write helpful meta data.
*
* @param fs file system to write to
* @param path file name to create
* @param conf user configuration
@ -86,18 +98,17 @@ public class StoreFileWriter implements CellSink, ShipperListener {
* @param bloomType bloom filter setting
* @param maxKeys the expected maximum number of keys to be added. Was used
* for Bloom filter size in {@link HFile} format version 1.
* @param favoredNodes
* @param fileContext - The HFile context
* @param favoredNodes an array of favored nodes or possibly null
* @param fileContext The HFile context
* @param shouldDropCacheBehind Drop pages written to page cache after writing the store file.
* @param compactedFilesSupplier Returns the {@link HStore} compacted files which not archived
* @throws IOException problem writing to FS
*/
private StoreFileWriter(FileSystem fs, Path path,
final Configuration conf,
CacheConfig cacheConf,
private StoreFileWriter(FileSystem fs, Path path, final Configuration conf, CacheConfig cacheConf,
final CellComparator comparator, BloomType bloomType, long maxKeys,
InetSocketAddress[] favoredNodes, HFileContext fileContext,
boolean shouldDropCacheBehind)
throws IOException {
InetSocketAddress[] favoredNodes, HFileContext fileContext, boolean shouldDropCacheBehind,
Supplier<Collection<HStoreFile>> compactedFilesSupplier) throws IOException {
this.compactedFilesSupplier = compactedFilesSupplier;
this.timeRangeTracker = TimeRangeTracker.create(TimeRangeTracker.Type.NON_SYNC);
// TODO : Change all writers to be specifically created for compaction context
writer = HFile.getWriterFactory(conf, cacheConf)
@ -167,11 +178,54 @@ public class StoreFileWriter implements CellSink, ShipperListener {
*/
public void appendMetadata(final long maxSequenceId, final boolean majorCompaction)
throws IOException {
appendMetadata(maxSequenceId, majorCompaction, Collections.emptySet());
}
/**
* Writes meta data.
* Call before {@link #close()} since its written as meta data to this file.
* @param maxSequenceId Maximum sequence id.
* @param majorCompaction True if this file is product of a major compaction
* @param storeFiles The compacted store files to generate this new file
* @throws IOException problem writing to FS
*/
public void appendMetadata(final long maxSequenceId, final boolean majorCompaction,
final Collection<HStoreFile> storeFiles) throws IOException {
writer.appendFileInfo(MAX_SEQ_ID_KEY, Bytes.toBytes(maxSequenceId));
writer.appendFileInfo(MAJOR_COMPACTION_KEY, Bytes.toBytes(majorCompaction));
writer.appendFileInfo(COMPACTION_EVENT_KEY, toCompactionEventTrackerBytes(storeFiles));
appendTrackedTimestampsToMetadata();
}
/**
* Used when write {@link HStoreFile#COMPACTION_EVENT_KEY} to new file's file info. The compacted
* store files's name is needed. But if the compacted store file is a result of compaction, it's
* compacted files which still not archived is needed, too. And don't need to add compacted files
* recursively. If file A, B, C compacted to new file D, and file D compacted to new file E, will
* write A, B, C, D to file E's compacted files. So if file E compacted to new file F, will add E
* to F's compacted files first, then add E's compacted files: A, B, C, D to it. And no need to
* add D's compacted file, as D's compacted files has been in E's compacted files, too.
* See HBASE-20724 for more details.
*
* @param storeFiles The compacted store files to generate this new file
* @return bytes of CompactionEventTracker
*/
private byte[] toCompactionEventTrackerBytes(Collection<HStoreFile> storeFiles) {
Set<String> notArchivedCompactedStoreFiles =
this.compactedFilesSupplier.get().stream().map(sf -> sf.getPath().getName())
.collect(Collectors.toSet());
Set<String> compactedStoreFiles = new HashSet<>();
for (HStoreFile storeFile : storeFiles) {
compactedStoreFiles.add(storeFile.getFileInfo().getPath().getName());
for (String csf : storeFile.getCompactedStoreFiles()) {
if (notArchivedCompactedStoreFiles.contains(csf)) {
compactedStoreFiles.add(csf);
}
}
}
return ProtobufUtil.toCompactionEventTrackerBytes(compactedStoreFiles);
}
/**
* Writes meta data.
* Call before {@link #close()} since its written as meta data to this file.
@ -368,6 +422,7 @@ public class StoreFileWriter implements CellSink, ShipperListener {
private InetSocketAddress[] favoredNodes;
private HFileContext fileContext;
private boolean shouldDropCacheBehind;
private Supplier<Collection<HStoreFile>> compactedFilesSupplier = () -> Collections.emptySet();
public Builder(Configuration conf, CacheConfig cacheConf,
FileSystem fs) {
@ -449,6 +504,12 @@ public class StoreFileWriter implements CellSink, ShipperListener {
return this;
}
public Builder withCompactedFilesSupplier(
Supplier<Collection<HStoreFile>> compactedFilesSupplier) {
this.compactedFilesSupplier = compactedFilesSupplier;
return this;
}
/**
* Create a store file writer. Client is responsible for closing file when
* done. If metadata, add BEFORE closing using
@ -487,9 +548,9 @@ public class StoreFileWriter implements CellSink, ShipperListener {
if (comparator == null) {
comparator = CellComparator.getInstance();
}
return new StoreFileWriter(fs, filePath,
conf, cacheConf, comparator, bloomType, maxKeyCount, favoredNodes, fileContext,
shouldDropCacheBehind);
return new StoreFileWriter(fs, filePath, conf, cacheConf, comparator, bloomType, maxKeyCount,
favoredNodes, fileContext, shouldDropCacheBehind, compactedFilesSupplier);
}
}
}

View File

@ -78,6 +78,6 @@ public class DateTieredCompactor extends AbstractMultiOutputCompactor<DateTiered
@Override
protected List<Path> commitWriter(DateTieredMultiFileWriter writer, FileDetails fd,
CompactionRequestImpl request) throws IOException {
return writer.commitWriters(fd.maxSeqId, request.isAllFiles());
return writer.commitWriters(fd.maxSeqId, request.isAllFiles(), request.getFiles());
}
}

View File

@ -86,7 +86,7 @@ public class DefaultCompactor extends Compactor<StoreFileWriter> {
protected List<Path> commitWriter(StoreFileWriter writer, FileDetails fd,
CompactionRequestImpl request) throws IOException {
List<Path> newFiles = Lists.newArrayList(writer.getPath());
writer.appendMetadata(fd.maxSeqId, request.isAllFiles());
writer.appendMetadata(fd.maxSeqId, request.isAllFiles(), request.getFiles());
writer.close();
return newFiles;
}

View File

@ -127,7 +127,7 @@ public class StripeCompactor extends AbstractMultiOutputCompactor<StripeMultiFil
@Override
protected List<Path> commitWriter(StripeMultiFileWriter writer, FileDetails fd,
CompactionRequestImpl request) throws IOException {
List<Path> newFiles = writer.commitWriters(fd.maxSeqId, request.isMajor());
List<Path> newFiles = writer.commitWriters(fd.maxSeqId, request.isMajor(), request.getFiles());
assert !newFiles.isEmpty() : "Should have produced an empty file to preserve metadata.";
return newFiles;
}

View File

@ -405,10 +405,11 @@ public class TestSpaceQuotasWithSnapshots {
// Compact the cloned table to force it to own its own files.
TEST_UTIL.compact(tn2, true);
// After the table is compacted, it should have its own files and be the same size as originally
// But The compaction result file has an additional compaction event tracker
TEST_UTIL.waitFor(30_000, 1_000, new SpaceQuotaSnapshotPredicate(conn, tn2) {
@Override
boolean evaluate(SpaceQuotaSnapshot snapshot) throws Exception {
return snapshot.getUsage() == actualInitialSize;
return snapshot.getUsage() >= actualInitialSize;
}
});
}

View File

@ -0,0 +1,197 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hbase.regionserver;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertNotNull;
import java.util.ArrayList;
import java.util.List;
import org.apache.hadoop.hbase.HBaseClassTestRule;
import org.apache.hadoop.hbase.HBaseTestingUtility;
import org.apache.hadoop.hbase.HConstants;
import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.client.Admin;
import org.apache.hadoop.hbase.client.ColumnFamilyDescriptorBuilder;
import org.apache.hadoop.hbase.client.Put;
import org.apache.hadoop.hbase.client.Scan;
import org.apache.hadoop.hbase.client.Table;
import org.apache.hadoop.hbase.client.TableDescriptorBuilder;
import org.apache.hadoop.hbase.master.cleaner.TimeToLiveHFileCleaner;
import org.apache.hadoop.hbase.regionserver.compactions.CompactionConfiguration;
import org.apache.hadoop.hbase.testclassification.LargeTests;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.hbase.util.JVMClusterUtil;
import org.junit.After;
import org.junit.AfterClass;
import org.junit.Before;
import org.junit.BeforeClass;
import org.junit.ClassRule;
import org.junit.Test;
import org.junit.experimental.categories.Category;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@Category({LargeTests.class})
public class TestCleanupCompactedFileAfterFailover {
private static final Logger LOG =
LoggerFactory.getLogger(TestCleanupCompactedFileAfterFailover.class);
@ClassRule
public static final HBaseClassTestRule CLASS_RULE =
HBaseClassTestRule.forClass(TestCleanupCompactedFileAfterFailover.class);
private static HBaseTestingUtility TEST_UTIL;
private static Admin admin;
private static Table table;
private static TableName TABLE_NAME = TableName.valueOf("TestCleanupCompactedFileAfterFailover");
private static byte[] ROW = Bytes.toBytes("row");
private static byte[] FAMILY = Bytes.toBytes("cf");
private static byte[] QUALIFIER = Bytes.toBytes("cq");
private static byte[] VALUE = Bytes.toBytes("value");
private static final int RS_NUMBER = 5;
@BeforeClass
public static void beforeClass() throws Exception {
TEST_UTIL = new HBaseTestingUtility();
// Set the scanner lease to 20min, so the scanner can't be closed by RegionServer
TEST_UTIL.getConfiguration().setInt(HConstants.HBASE_CLIENT_SCANNER_TIMEOUT_PERIOD, 1200000);
TEST_UTIL.getConfiguration()
.setInt(CompactionConfiguration.HBASE_HSTORE_COMPACTION_MIN_KEY, 100);
TEST_UTIL.getConfiguration().set("dfs.blocksize", "64000");
TEST_UTIL.getConfiguration().set("dfs.namenode.fs-limits.min-block-size", "1024");
TEST_UTIL.getConfiguration().set(TimeToLiveHFileCleaner.TTL_CONF_KEY, "0");
TEST_UTIL.startMiniCluster(RS_NUMBER);
admin = TEST_UTIL.getAdmin();
}
@AfterClass
public static void afterClass() throws Exception {
TEST_UTIL.shutdownMiniCluster();
}
@Before
public void before() throws Exception {
TableDescriptorBuilder builder = TableDescriptorBuilder.newBuilder(TABLE_NAME);
builder.setColumnFamily(ColumnFamilyDescriptorBuilder.of(FAMILY));
admin.createTable(builder.build());
TEST_UTIL.waitTableAvailable(TABLE_NAME);
table = TEST_UTIL.getConnection().getTable(TABLE_NAME);
}
@After
public void after() throws Exception {
admin.disableTable(TABLE_NAME);
admin.deleteTable(TABLE_NAME);
}
@Test
public void testCleanupAfterFailoverWithCompactOnce() throws Exception {
testCleanupAfterFailover(1);
}
@Test
public void testCleanupAfterFailoverWithCompactTwice() throws Exception {
testCleanupAfterFailover(2);
}
@Test
public void testCleanupAfterFailoverWithCompactThreeTimes() throws Exception {
testCleanupAfterFailover(3);
}
private void testCleanupAfterFailover(int compactNum) throws Exception {
HRegionServer rsServedTable = null;
List<HRegion> regions = new ArrayList<>();
for (JVMClusterUtil.RegionServerThread rsThread : TEST_UTIL.getHBaseCluster()
.getLiveRegionServerThreads()) {
HRegionServer rs = rsThread.getRegionServer();
if (rs.getOnlineTables().contains(TABLE_NAME)) {
regions.addAll(rs.getRegions(TABLE_NAME));
rsServedTable = rs;
}
}
assertNotNull(rsServedTable);
assertEquals("Table should only have one region", 1, regions.size());
HRegion region = regions.get(0);
HStore store = region.getStore(FAMILY);
writeDataAndFlush(3, region);
assertEquals(3, store.getStorefilesCount());
// Open a scanner and not close, then the storefile will be referenced
store.getScanner(new Scan(), null, 0);
region.compact(true);
assertEquals(1, store.getStorefilesCount());
// The compacted file should not be archived as there are references by user scanner
assertEquals(3, store.getStoreEngine().getStoreFileManager().getCompactedfiles().size());
for (int i = 1; i < compactNum; i++) {
// Compact again
region.compact(true);
assertEquals(1, store.getStorefilesCount());
store.closeAndArchiveCompactedFiles();
// Compacted storefiles still be 3 as the new compacted storefile was archived
assertEquals(3, store.getStoreEngine().getStoreFileManager().getCompactedfiles().size());
}
int walNum = rsServedTable.getWALs().size();
// Roll WAL
rsServedTable.walRoller.requestRollAll();
// Flush again
region.flush(true);
// The WAL which contains compaction event marker should be archived
assertEquals("The old WAL should be archived", walNum, rsServedTable.getWALs().size());
rsServedTable.kill();
// Sleep to wait failover
Thread.sleep(3000);
TEST_UTIL.waitTableAvailable(TABLE_NAME);
regions.clear();
for (JVMClusterUtil.RegionServerThread rsThread : TEST_UTIL.getHBaseCluster()
.getLiveRegionServerThreads()) {
HRegionServer rs = rsThread.getRegionServer();
if (rs != rsServedTable && rs.getOnlineTables().contains(TABLE_NAME)) {
regions.addAll(rs.getRegions(TABLE_NAME));
}
}
assertEquals("Table should only have one region", 1, regions.size());
region = regions.get(0);
store = region.getStore(FAMILY);
// The compacted storefile should be cleaned and only have 1 storefile
assertEquals(1, store.getStorefilesCount());
}
private void writeDataAndFlush(int fileNum, HRegion region) throws Exception {
for (int i = 0; i < fileNum; i++) {
for (int j = 0; j < 100; j++) {
table.put(new Put(concat(ROW, j)).addColumn(FAMILY, QUALIFIER, concat(VALUE, j)));
}
region.flush(true);
}
}
private byte[] concat(byte[] base, int index) {
return Bytes.toBytes(Bytes.toString(base) + "-" + index);
}
}

View File

@ -22,19 +22,15 @@ import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertNotNull;
import static org.junit.Assert.assertTrue;
import static org.junit.Assert.fail;
import java.io.IOException;
import java.util.Collection;
import org.apache.hadoop.hbase.HBaseClassTestRule;
import org.apache.hadoop.hbase.HBaseTestingUtility;
import org.apache.hadoop.hbase.KeyValue;
import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.client.Delete;
import org.apache.hadoop.hbase.client.Get;
import org.apache.hadoop.hbase.client.HBaseAdmin;
import org.apache.hadoop.hbase.client.IsolationLevel;
import org.apache.hadoop.hbase.client.Put;
import org.apache.hadoop.hbase.client.Result;
import org.apache.hadoop.hbase.client.ResultScanner;
@ -151,60 +147,4 @@ public class TestCleanupCompactedFileOnRegionClose {
((HStore)region.getStore(familyNameBytes)).getStoreEngine().getStoreFileManager()
.getCompactedfiles().size());
}
@Test
public void testIOExceptionThrownOnClose() throws Exception {
byte[] filler = new byte[128000];
TableName tableName = TableName.valueOf("testIOExceptionThrownOnClose");
String familyName = "f";
byte[] familyNameBytes = Bytes.toBytes(familyName);
util.createTable(tableName, familyName);
Table table = util.getConnection().getTable(tableName);
HRegionServer rs = util.getRSForFirstRegionInTable(tableName);
Region region = rs.getRegions(tableName).get(0);
int refSFCount = 4;
for (int i = 0; i < refSFCount; i++) {
for (int j = 0; j < refSFCount; j++) {
Put put = new Put(Bytes.toBytes(j));
put.addColumn(familyNameBytes, Bytes.toBytes(i), filler);
table.put(put);
}
util.flush(tableName);
}
assertEquals(refSFCount, region.getStoreFileList(new byte[][]{familyNameBytes}).size());
HStore store = ((HRegion) region).getStore(familyNameBytes);
HStoreFile hsf = ((Collection<HStoreFile>)region.getStore(familyNameBytes).getStorefiles())
.iterator().next();
long readPt = ((HRegion)region).getReadPoint(IsolationLevel.READ_COMMITTED);
StoreFileScanner preadScanner = hsf.getPreadScanner(false, readPt, 0, false);
StoreFileScanner streamScanner =
hsf.getStreamScanner(false, false, false, readPt, 0, false);
preadScanner.seek(KeyValue.LOWESTKEY);
streamScanner.seek(KeyValue.LOWESTKEY);
//Major compact to produce compacted storefiles that need to be cleaned up
util.compact(tableName, true);
assertNotNull(preadScanner.next());
assertNotNull(streamScanner.next());
store.closeAndArchiveCompactedFiles(true);
try {
assertNotNull(preadScanner.next());
fail("Expected IOException");
}catch (IOException ex) {
ex.printStackTrace();
}
//Wait a bit for file to be remove from
try {
assertNotNull(streamScanner.next());
fail("Expected IOException");
} catch (IOException ex) {
ex.printStackTrace();
}
}
}

View File

@ -23,12 +23,8 @@ import static org.junit.Assert.assertTrue;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.List;
import java.util.Set;
import java.util.concurrent.ThreadLocalRandom;
import java.util.stream.Collectors;
import org.apache.hadoop.hbase.Cell;
import org.apache.hadoop.hbase.HBaseClassTestRule;
@ -38,7 +34,6 @@ import org.apache.hadoop.hbase.client.ColumnFamilyDescriptorBuilder;
import org.apache.hadoop.hbase.client.Put;
import org.apache.hadoop.hbase.client.Result;
import org.apache.hadoop.hbase.client.Scan;
import org.apache.hadoop.hbase.client.Scan.ReadType;
import org.apache.hadoop.hbase.client.TableDescriptorBuilder;
import org.apache.hadoop.hbase.filter.Filter;
import org.apache.hadoop.hbase.filter.FilterBase;
@ -48,7 +43,6 @@ import org.apache.hadoop.hbase.testclassification.MediumTests;
import org.apache.hadoop.hbase.testclassification.RegionServerTests;
import org.apache.hadoop.hbase.util.Bytes;
import org.junit.After;
import org.junit.Assert;
import org.junit.Before;
import org.junit.ClassRule;
import org.junit.Ignore;
@ -105,49 +99,6 @@ public class TestSwitchToStreamRead {
UTIL.cleanupTestDir();
}
private Set<StoreFileReader> getStreamReaders() {
List<HStore> stores = REGION.getStores();
Assert.assertEquals(1, stores.size());
HStore firstStore = stores.get(0);
Assert.assertNotNull(firstStore);
Collection<HStoreFile> storeFiles = firstStore.getStorefiles();
Assert.assertEquals(1, storeFiles.size());
HStoreFile firstSToreFile = storeFiles.iterator().next();
Assert.assertNotNull(firstSToreFile);
return Collections.unmodifiableSet(firstSToreFile.streamReaders);
}
/**
* Test Case for HBASE-21551
*/
@Test
public void testStreamReadersCleanup() throws IOException {
Set<StoreFileReader> streamReaders = getStreamReaders();
Assert.assertEquals(0, getStreamReaders().size());
try (RegionScannerImpl scanner = REGION.getScanner(new Scan().setReadType(ReadType.STREAM))) {
StoreScanner storeScanner =
(StoreScanner) (scanner).getStoreHeapForTesting().getCurrentForTesting();
List<StoreFileScanner> sfScanners = storeScanner.getAllScannersForTesting().stream()
.filter(kvs -> kvs instanceof StoreFileScanner).map(kvs -> (StoreFileScanner) kvs)
.collect(Collectors.toList());
Assert.assertEquals(1, sfScanners.size());
StoreFileScanner sfScanner = sfScanners.get(0);
Assert.assertFalse(sfScanner.getReader().shared);
// There should be a stream reader
Assert.assertEquals(1, getStreamReaders().size());
}
Assert.assertEquals(0, getStreamReaders().size());
// The streamsReader should be clear after region close even if there're some opened stream
// scanner.
RegionScannerImpl scanner = REGION.getScanner(new Scan().setReadType(ReadType.STREAM));
Assert.assertNotNull(scanner);
Assert.assertEquals(1, getStreamReaders().size());
REGION.close();
Assert.assertEquals(0, streamReaders.size());
}
@Test
public void test() throws IOException {
try (RegionScannerImpl scanner = REGION.getScanner(new Scan())) {

View File

@ -25,6 +25,7 @@ import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertNotNull;
import static org.junit.Assert.assertNull;
import static org.junit.Assert.assertTrue;
import static org.mockito.ArgumentMatchers.anyCollection;
import static org.mockito.Matchers.any;
import static org.mockito.Matchers.anyBoolean;
import static org.mockito.Matchers.anyLong;
@ -111,6 +112,13 @@ public class TestCompactor {
return null;
}
}).when(writer).appendMetadata(anyLong(), anyBoolean());
doAnswer(new Answer<Void>() {
@Override
public Void answer(InvocationOnMock invocation) throws Throwable {
realWriter.hasMetadata = true;
return null;
}
}).when(writer).appendMetadata(anyLong(), anyBoolean(), anyCollection());
doAnswer(new Answer<Path>() {
@Override
public Path answer(InvocationOnMock invocation) throws Throwable {