HBASE-26065 StripeStoreFileManager does not need to throw IOException for most methods (#3459)
Signed-off-by: Xiaolin Ha <haxiaolin@apache.org>
This commit is contained in:
parent
1883889e26
commit
a3ad97fbfc
|
@ -88,9 +88,9 @@ class DefaultStoreFileManager implements StoreFileManager {
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public void insertNewFiles(Collection<HStoreFile> sfs) throws IOException {
|
public void insertNewFiles(Collection<HStoreFile> sfs) {
|
||||||
this.storefiles =
|
this.storefiles =
|
||||||
ImmutableList.sortedCopyOf(storeFileComparator, Iterables.concat(this.storefiles, sfs));
|
ImmutableList.sortedCopyOf(storeFileComparator, Iterables.concat(this.storefiles, sfs));
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
|
@ -132,11 +132,10 @@ class DefaultStoreFileManager implements StoreFileManager {
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public void removeCompactedFiles(Collection<HStoreFile> removedCompactedfiles)
|
public void removeCompactedFiles(Collection<HStoreFile> removedCompactedfiles) {
|
||||||
throws IOException {
|
|
||||||
this.compactedfiles =
|
this.compactedfiles =
|
||||||
this.compactedfiles.stream().filter(sf -> !removedCompactedfiles.contains(sf))
|
this.compactedfiles.stream().filter(sf -> !removedCompactedfiles.contains(sf))
|
||||||
.sorted(storeFileComparator).collect(ImmutableList.toImmutableList());
|
.sorted(storeFileComparator).collect(ImmutableList.toImmutableList());
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
|
|
|
@ -32,12 +32,15 @@ import org.apache.yetus.audience.InterfaceAudience;
|
||||||
import org.apache.hbase.thirdparty.com.google.common.collect.ImmutableCollection;
|
import org.apache.hbase.thirdparty.com.google.common.collect.ImmutableCollection;
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Manages the store files and basic metadata about that that determines the logical structure
|
* Manages the store files and basic metadata about that that determines the logical structure (e.g.
|
||||||
* (e.g. what files to return for scan, how to determine split point, and such).
|
* what files to return for scan, how to determine split point, and such). Does NOT affect the
|
||||||
* Does NOT affect the physical structure of files in HDFS.
|
* physical structure of files in HDFS. Example alternative structures - the default list of files
|
||||||
* Example alternative structures - the default list of files by seqNum; levelDB one sorted
|
* by seqNum; levelDB one sorted by level and seqNum.
|
||||||
* by level and seqNum.
|
* <p/>
|
||||||
*
|
* Notice that, all the states are only in memory, we do not persist anything here. The only place
|
||||||
|
* where we throw an {@link IOException} is the {@link #getSplitPoint()} method, where we need to
|
||||||
|
* read startKey, endKey etc, which may lead to an {@link IOException}.
|
||||||
|
* <p/>
|
||||||
* Implementations are assumed to be not thread safe.
|
* Implementations are assumed to be not thread safe.
|
||||||
*/
|
*/
|
||||||
@InterfaceAudience.Private
|
@InterfaceAudience.Private
|
||||||
|
@ -52,22 +55,20 @@ public interface StoreFileManager {
|
||||||
* Adds new files, either for from MemStore flush or bulk insert, into the structure.
|
* Adds new files, either for from MemStore flush or bulk insert, into the structure.
|
||||||
* @param sfs New store files.
|
* @param sfs New store files.
|
||||||
*/
|
*/
|
||||||
void insertNewFiles(Collection<HStoreFile> sfs) throws IOException;
|
void insertNewFiles(Collection<HStoreFile> sfs);
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Adds only the new compaction results into the structure.
|
* Adds only the new compaction results into the structure.
|
||||||
* @param compactedFiles The input files for the compaction.
|
* @param compactedFiles The input files for the compaction.
|
||||||
* @param results The resulting files for the compaction.
|
* @param results The resulting files for the compaction.
|
||||||
*/
|
*/
|
||||||
void addCompactionResults(
|
void addCompactionResults(Collection<HStoreFile> compactedFiles, Collection<HStoreFile> results);
|
||||||
Collection<HStoreFile> compactedFiles, Collection<HStoreFile> results) throws IOException;
|
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Remove the compacted files
|
* Remove the compacted files
|
||||||
* @param compactedFiles the list of compacted files
|
* @param compactedFiles the list of compacted files
|
||||||
* @throws IOException
|
|
||||||
*/
|
*/
|
||||||
void removeCompactedFiles(Collection<HStoreFile> compactedFiles) throws IOException;
|
void removeCompactedFiles(Collection<HStoreFile> compactedFiles);
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Clears all the files currently in use and returns them.
|
* Clears all the files currently in use and returns them.
|
||||||
|
@ -145,7 +146,6 @@ public interface StoreFileManager {
|
||||||
/**
|
/**
|
||||||
* Gets the split point for the split of this set of store files (approx. middle).
|
* Gets the split point for the split of this set of store files (approx. middle).
|
||||||
* @return The mid-point if possible.
|
* @return The mid-point if possible.
|
||||||
* @throws IOException
|
|
||||||
*/
|
*/
|
||||||
Optional<byte[]> getSplitPoint() throws IOException;
|
Optional<byte[]> getSplitPoint() throws IOException;
|
||||||
|
|
||||||
|
|
|
@ -152,10 +152,9 @@ public class StripeStoreFileManager
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public void insertNewFiles(Collection<HStoreFile> sfs) throws IOException {
|
public void insertNewFiles(Collection<HStoreFile> sfs) {
|
||||||
CompactionOrFlushMergeCopy cmc = new CompactionOrFlushMergeCopy(true);
|
CompactionOrFlushMergeCopy cmc = new CompactionOrFlushMergeCopy(true);
|
||||||
// Passing null does not cause NPE??
|
cmc.mergeResults(Collections.emptyList(), sfs);
|
||||||
cmc.mergeResults(null, sfs);
|
|
||||||
debugDumpState("Added new files");
|
debugDumpState("Added new files");
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -321,11 +320,11 @@ public class StripeStoreFileManager
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public void addCompactionResults(
|
public void addCompactionResults(Collection<HStoreFile> compactedFiles,
|
||||||
Collection<HStoreFile> compactedFiles, Collection<HStoreFile> results) throws IOException {
|
Collection<HStoreFile> results) {
|
||||||
// See class comment for the assumptions we make here.
|
// See class comment for the assumptions we make here.
|
||||||
LOG.debug("Attempting to merge compaction results: " + compactedFiles.size()
|
LOG.debug("Attempting to merge compaction results: " + compactedFiles.size() +
|
||||||
+ " files replaced by " + results.size());
|
" files replaced by " + results.size());
|
||||||
// In order to be able to fail in the middle of the operation, we'll operate on lazy
|
// In order to be able to fail in the middle of the operation, we'll operate on lazy
|
||||||
// copies and apply the result at the end.
|
// copies and apply the result at the end.
|
||||||
CompactionOrFlushMergeCopy cmc = new CompactionOrFlushMergeCopy(false);
|
CompactionOrFlushMergeCopy cmc = new CompactionOrFlushMergeCopy(false);
|
||||||
|
@ -345,7 +344,7 @@ public class StripeStoreFileManager
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public void removeCompactedFiles(Collection<HStoreFile> compactedFiles) throws IOException {
|
public void removeCompactedFiles(Collection<HStoreFile> compactedFiles) {
|
||||||
// See class comment for the assumptions we make here.
|
// See class comment for the assumptions we make here.
|
||||||
LOG.debug("Attempting to delete compaction results: " + compactedFiles.size());
|
LOG.debug("Attempting to delete compaction results: " + compactedFiles.size());
|
||||||
// In order to be able to fail in the middle of the operation, we'll operate on lazy
|
// In order to be able to fail in the middle of the operation, we'll operate on lazy
|
||||||
|
@ -728,13 +727,15 @@ public class StripeStoreFileManager
|
||||||
this.isFlush = isFlush;
|
this.isFlush = isFlush;
|
||||||
}
|
}
|
||||||
|
|
||||||
private void mergeResults(Collection<HStoreFile> compactedFiles, Collection<HStoreFile> results)
|
private void mergeResults(Collection<HStoreFile> compactedFiles,
|
||||||
throws IOException {
|
Collection<HStoreFile> results) {
|
||||||
assert this.compactedFiles == null && this.results == null;
|
assert this.compactedFiles == null && this.results == null;
|
||||||
this.compactedFiles = compactedFiles;
|
this.compactedFiles = compactedFiles;
|
||||||
this.results = results;
|
this.results = results;
|
||||||
// Do logical processing.
|
// Do logical processing.
|
||||||
if (!isFlush) removeCompactedFiles();
|
if (!isFlush) {
|
||||||
|
removeCompactedFiles();
|
||||||
|
}
|
||||||
TreeMap<byte[], HStoreFile> newStripes = processResults();
|
TreeMap<byte[], HStoreFile> newStripes = processResults();
|
||||||
if (newStripes != null) {
|
if (newStripes != null) {
|
||||||
processNewCandidateStripes(newStripes);
|
processNewCandidateStripes(newStripes);
|
||||||
|
@ -745,7 +746,7 @@ public class StripeStoreFileManager
|
||||||
updateMetadataMaps();
|
updateMetadataMaps();
|
||||||
}
|
}
|
||||||
|
|
||||||
private void deleteResults(Collection<HStoreFile> compactedFiles) throws IOException {
|
private void deleteResults(Collection<HStoreFile> compactedFiles) {
|
||||||
this.compactedFiles = compactedFiles;
|
this.compactedFiles = compactedFiles;
|
||||||
// Create new state and update parent.
|
// Create new state and update parent.
|
||||||
State state = createNewState(true);
|
State state = createNewState(true);
|
||||||
|
@ -828,11 +829,11 @@ public class StripeStoreFileManager
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Process new files, and add them either to the structure of existing stripes,
|
* Process new files, and add them either to the structure of existing stripes, or to the list
|
||||||
* or to the list of new candidate stripes.
|
* of new candidate stripes.
|
||||||
* @return New candidate stripes.
|
* @return New candidate stripes.
|
||||||
*/
|
*/
|
||||||
private TreeMap<byte[], HStoreFile> processResults() throws IOException {
|
private TreeMap<byte[], HStoreFile> processResults() {
|
||||||
TreeMap<byte[], HStoreFile> newStripes = null;
|
TreeMap<byte[], HStoreFile> newStripes = null;
|
||||||
for (HStoreFile sf : this.results) {
|
for (HStoreFile sf : this.results) {
|
||||||
byte[] startRow = startOf(sf), endRow = endOf(sf);
|
byte[] startRow = startOf(sf), endRow = endOf(sf);
|
||||||
|
@ -859,8 +860,9 @@ public class StripeStoreFileManager
|
||||||
}
|
}
|
||||||
HStoreFile oldSf = newStripes.put(endRow, sf);
|
HStoreFile oldSf = newStripes.put(endRow, sf);
|
||||||
if (oldSf != null) {
|
if (oldSf != null) {
|
||||||
throw new IOException("Compactor has produced multiple files for the stripe ending in ["
|
throw new IllegalStateException(
|
||||||
+ Bytes.toString(endRow) + "], found " + sf.getPath() + " and " + oldSf.getPath());
|
"Compactor has produced multiple files for the stripe ending in [" +
|
||||||
|
Bytes.toString(endRow) + "], found " + sf.getPath() + " and " + oldSf.getPath());
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
return newStripes;
|
return newStripes;
|
||||||
|
@ -869,7 +871,7 @@ public class StripeStoreFileManager
|
||||||
/**
|
/**
|
||||||
* Remove compacted files.
|
* Remove compacted files.
|
||||||
*/
|
*/
|
||||||
private void removeCompactedFiles() throws IOException {
|
private void removeCompactedFiles() {
|
||||||
for (HStoreFile oldFile : this.compactedFiles) {
|
for (HStoreFile oldFile : this.compactedFiles) {
|
||||||
byte[] oldEndRow = endOf(oldFile);
|
byte[] oldEndRow = endOf(oldFile);
|
||||||
List<HStoreFile> source = null;
|
List<HStoreFile> source = null;
|
||||||
|
@ -878,13 +880,14 @@ public class StripeStoreFileManager
|
||||||
} else {
|
} else {
|
||||||
int stripeIndex = findStripeIndexByEndRow(oldEndRow);
|
int stripeIndex = findStripeIndexByEndRow(oldEndRow);
|
||||||
if (stripeIndex < 0) {
|
if (stripeIndex < 0) {
|
||||||
throw new IOException("An allegedly compacted file [" + oldFile + "] does not belong"
|
throw new IllegalStateException(
|
||||||
+ " to a known stripe (end row - [" + Bytes.toString(oldEndRow) + "])");
|
"An allegedly compacted file [" + oldFile + "] does not belong" +
|
||||||
|
" to a known stripe (end row - [" + Bytes.toString(oldEndRow) + "])");
|
||||||
}
|
}
|
||||||
source = getStripeCopy(stripeIndex);
|
source = getStripeCopy(stripeIndex);
|
||||||
}
|
}
|
||||||
if (!source.remove(oldFile)) {
|
if (!source.remove(oldFile)) {
|
||||||
throw new IOException("An allegedly compacted file [" + oldFile + "] was not found");
|
LOG.warn("An allegedly compacted file [{}] was not found", oldFile);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -894,8 +897,7 @@ public class StripeStoreFileManager
|
||||||
* new candidate stripes/removes old stripes; produces new set of stripe end rows.
|
* new candidate stripes/removes old stripes; produces new set of stripe end rows.
|
||||||
* @param newStripes New stripes - files by end row.
|
* @param newStripes New stripes - files by end row.
|
||||||
*/
|
*/
|
||||||
private void processNewCandidateStripes(
|
private void processNewCandidateStripes(TreeMap<byte[], HStoreFile> newStripes) {
|
||||||
TreeMap<byte[], HStoreFile> newStripes) throws IOException {
|
|
||||||
// Validate that the removed and added aggregate ranges still make for a full key space.
|
// Validate that the removed and added aggregate ranges still make for a full key space.
|
||||||
boolean hasStripes = !this.stripeFiles.isEmpty();
|
boolean hasStripes = !this.stripeFiles.isEmpty();
|
||||||
this.stripeEndRows = new ArrayList<>(Arrays.asList(StripeStoreFileManager.this.state.stripeEndRows));
|
this.stripeEndRows = new ArrayList<>(Arrays.asList(StripeStoreFileManager.this.state.stripeEndRows));
|
||||||
|
@ -903,7 +905,7 @@ public class StripeStoreFileManager
|
||||||
byte[] firstStartRow = startOf(newStripes.firstEntry().getValue());
|
byte[] firstStartRow = startOf(newStripes.firstEntry().getValue());
|
||||||
byte[] lastEndRow = newStripes.lastKey();
|
byte[] lastEndRow = newStripes.lastKey();
|
||||||
if (!hasStripes && (!isOpen(firstStartRow) || !isOpen(lastEndRow))) {
|
if (!hasStripes && (!isOpen(firstStartRow) || !isOpen(lastEndRow))) {
|
||||||
throw new IOException("Newly created stripes do not cover the entire key space.");
|
throw new IllegalStateException("Newly created stripes do not cover the entire key space.");
|
||||||
}
|
}
|
||||||
|
|
||||||
boolean canAddNewStripes = true;
|
boolean canAddNewStripes = true;
|
||||||
|
@ -915,11 +917,15 @@ public class StripeStoreFileManager
|
||||||
removeFrom = 0;
|
removeFrom = 0;
|
||||||
} else {
|
} else {
|
||||||
removeFrom = findStripeIndexByEndRow(firstStartRow);
|
removeFrom = findStripeIndexByEndRow(firstStartRow);
|
||||||
if (removeFrom < 0) throw new IOException("Compaction is trying to add a bad range.");
|
if (removeFrom < 0) {
|
||||||
|
throw new IllegalStateException("Compaction is trying to add a bad range.");
|
||||||
|
}
|
||||||
++removeFrom;
|
++removeFrom;
|
||||||
}
|
}
|
||||||
int removeTo = findStripeIndexByEndRow(lastEndRow);
|
int removeTo = findStripeIndexByEndRow(lastEndRow);
|
||||||
if (removeTo < 0) throw new IOException("Compaction is trying to add a bad range.");
|
if (removeTo < 0) {
|
||||||
|
throw new IllegalStateException("Compaction is trying to add a bad range.");
|
||||||
|
}
|
||||||
// See if there are files in the stripes we are trying to replace.
|
// See if there are files in the stripes we are trying to replace.
|
||||||
ArrayList<HStoreFile> conflictingFiles = new ArrayList<>();
|
ArrayList<HStoreFile> conflictingFiles = new ArrayList<>();
|
||||||
for (int removeIndex = removeTo; removeIndex >= removeFrom; --removeIndex) {
|
for (int removeIndex = removeTo; removeIndex >= removeFrom; --removeIndex) {
|
||||||
|
@ -961,7 +967,9 @@ public class StripeStoreFileManager
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
if (!canAddNewStripes) return; // Files were already put into L0.
|
if (!canAddNewStripes) {
|
||||||
|
return; // Files were already put into L0.
|
||||||
|
}
|
||||||
|
|
||||||
// Now, insert new stripes. The total ranges match, so we can insert where we removed.
|
// Now, insert new stripes. The total ranges match, so we can insert where we removed.
|
||||||
byte[] previousEndRow = null;
|
byte[] previousEndRow = null;
|
||||||
|
@ -972,8 +980,8 @@ public class StripeStoreFileManager
|
||||||
assert !isOpen(previousEndRow);
|
assert !isOpen(previousEndRow);
|
||||||
byte[] startRow = startOf(newStripe.getValue());
|
byte[] startRow = startOf(newStripe.getValue());
|
||||||
if (!rowEquals(previousEndRow, startRow)) {
|
if (!rowEquals(previousEndRow, startRow)) {
|
||||||
throw new IOException("The new stripes produced by "
|
throw new IllegalStateException("The new stripes produced by " +
|
||||||
+ (isFlush ? "flush" : "compaction") + " are not contiguous");
|
(isFlush ? "flush" : "compaction") + " are not contiguous");
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
// Add the new stripe.
|
// Add the new stripe.
|
||||||
|
|
|
@ -21,8 +21,9 @@ import static org.apache.hadoop.hbase.regionserver.StripeStoreFileManager.OPEN_K
|
||||||
import static org.junit.Assert.assertArrayEquals;
|
import static org.junit.Assert.assertArrayEquals;
|
||||||
import static org.junit.Assert.assertEquals;
|
import static org.junit.Assert.assertEquals;
|
||||||
import static org.junit.Assert.assertFalse;
|
import static org.junit.Assert.assertFalse;
|
||||||
|
import static org.junit.Assert.assertThrows;
|
||||||
import static org.junit.Assert.assertTrue;
|
import static org.junit.Assert.assertTrue;
|
||||||
import static org.junit.Assert.fail;
|
|
||||||
import java.io.IOException;
|
import java.io.IOException;
|
||||||
import java.util.ArrayList;
|
import java.util.ArrayList;
|
||||||
import java.util.Arrays;
|
import java.util.Arrays;
|
||||||
|
@ -542,14 +543,10 @@ public class TestStripeStoreFileManager {
|
||||||
}
|
}
|
||||||
|
|
||||||
private void verifyInvalidCompactionScenario(StripeStoreFileManager manager,
|
private void verifyInvalidCompactionScenario(StripeStoreFileManager manager,
|
||||||
ArrayList<HStoreFile> filesToCompact, ArrayList<HStoreFile> filesToInsert) throws Exception {
|
ArrayList<HStoreFile> filesToCompact, ArrayList<HStoreFile> filesToInsert) throws Exception {
|
||||||
Collection<HStoreFile> allFiles = manager.getStorefiles();
|
Collection<HStoreFile> allFiles = manager.getStorefiles();
|
||||||
try {
|
assertThrows(IllegalStateException.class,
|
||||||
manager.addCompactionResults(filesToCompact, filesToInsert);
|
() -> manager.addCompactionResults(filesToCompact, filesToInsert));
|
||||||
fail("Should have thrown");
|
|
||||||
} catch (IOException ex) {
|
|
||||||
// Ignore it.
|
|
||||||
}
|
|
||||||
verifyAllFiles(manager, allFiles); // must have the same files.
|
verifyAllFiles(manager, allFiles); // must have the same files.
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
Loading…
Reference in New Issue