HBASE-26791 Memstore flush fencing issue for SFT (#4202)
Signed-off-by: Josh Elser <elserj@apache.org> Signed-off-by: Wellington Chevreuil <wchevreuil@apache.org>
This commit is contained in:
parent
0d7638d66d
commit
e56ed404cb
|
@ -38,13 +38,6 @@ class DefaultStoreFileTracker extends StoreFileTrackerBase {
|
||||||
super(conf, isPrimaryReplica, ctx);
|
super(conf, isPrimaryReplica, ctx);
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
|
||||||
public List<StoreFileInfo> load() throws IOException {
|
|
||||||
List<StoreFileInfo> files =
|
|
||||||
ctx.getRegionFileSystem().getStoreFiles(ctx.getFamily().getNameAsString());
|
|
||||||
return files != null ? files : Collections.emptyList();
|
|
||||||
}
|
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public boolean requireWritingToTmpDirFirst() {
|
public boolean requireWritingToTmpDirFirst() {
|
||||||
return true;
|
return true;
|
||||||
|
@ -62,7 +55,13 @@ class DefaultStoreFileTracker extends StoreFileTrackerBase {
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public void set(List<StoreFileInfo> files) {
|
protected List<StoreFileInfo> doLoadStoreFiles(boolean readOnly) throws IOException {
|
||||||
// NOOP
|
List<StoreFileInfo> files =
|
||||||
|
ctx.getRegionFileSystem().getStoreFiles(ctx.getFamily().getNameAsString());
|
||||||
|
return files != null ? files : Collections.emptyList();
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
protected void doSetStoreFiles(Collection<StoreFileInfo> files) throws IOException {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -67,8 +67,8 @@ class FileBasedStoreFileTracker extends StoreFileTrackerBase {
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public List<StoreFileInfo> load() throws IOException {
|
protected List<StoreFileInfo> doLoadStoreFiles(boolean readOnly) throws IOException {
|
||||||
StoreFileList list = backedFile.load();
|
StoreFileList list = backedFile.load(readOnly);
|
||||||
if (list == null) {
|
if (list == null) {
|
||||||
return Collections.emptyList();
|
return Collections.emptyList();
|
||||||
}
|
}
|
||||||
|
@ -148,7 +148,7 @@ class FileBasedStoreFileTracker extends StoreFileTrackerBase {
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public void set(List<StoreFileInfo> files) throws IOException {
|
protected void doSetStoreFiles(Collection<StoreFileInfo> files) throws IOException {
|
||||||
synchronized (storefiles) {
|
synchronized (storefiles) {
|
||||||
storefiles.clear();
|
storefiles.clear();
|
||||||
StoreFileList.Builder builder = StoreFileList.newBuilder();
|
StoreFileList.Builder builder = StoreFileList.newBuilder();
|
||||||
|
|
|
@ -49,13 +49,6 @@ class MigrationStoreFileTracker extends StoreFileTrackerBase {
|
||||||
"src and dst is the same: %s", src.getClass());
|
"src and dst is the same: %s", src.getClass());
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
|
||||||
public List<StoreFileInfo> load() throws IOException {
|
|
||||||
List<StoreFileInfo> files = src.load();
|
|
||||||
dst.set(files);
|
|
||||||
return files;
|
|
||||||
}
|
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public boolean requireWritingToTmpDirFirst() {
|
public boolean requireWritingToTmpDirFirst() {
|
||||||
// Returns true if either of the two StoreFileTracker returns true.
|
// Returns true if either of the two StoreFileTracker returns true.
|
||||||
|
@ -67,6 +60,15 @@ class MigrationStoreFileTracker extends StoreFileTrackerBase {
|
||||||
return src.requireWritingToTmpDirFirst() || dst.requireWritingToTmpDirFirst();
|
return src.requireWritingToTmpDirFirst() || dst.requireWritingToTmpDirFirst();
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
protected List<StoreFileInfo> doLoadStoreFiles(boolean readOnly) throws IOException {
|
||||||
|
List<StoreFileInfo> files = src.doLoadStoreFiles(readOnly);
|
||||||
|
if (!readOnly) {
|
||||||
|
dst.doSetStoreFiles(files);
|
||||||
|
}
|
||||||
|
return files;
|
||||||
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
protected void doAddNewStoreFiles(Collection<StoreFileInfo> newFiles) throws IOException {
|
protected void doAddNewStoreFiles(Collection<StoreFileInfo> newFiles) throws IOException {
|
||||||
src.doAddNewStoreFiles(newFiles);
|
src.doAddNewStoreFiles(newFiles);
|
||||||
|
@ -81,7 +83,7 @@ class MigrationStoreFileTracker extends StoreFileTrackerBase {
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public void set(List<StoreFileInfo> files) {
|
protected void doSetStoreFiles(Collection<StoreFileInfo> files) throws IOException {
|
||||||
throw new UnsupportedOperationException(
|
throw new UnsupportedOperationException(
|
||||||
"Should not call this method on " + getClass().getSimpleName());
|
"Should not call this method on " + getClass().getSimpleName());
|
||||||
}
|
}
|
||||||
|
|
|
@ -20,17 +20,29 @@ package org.apache.hadoop.hbase.regionserver.storefiletracker;
|
||||||
import java.io.EOFException;
|
import java.io.EOFException;
|
||||||
import java.io.FileNotFoundException;
|
import java.io.FileNotFoundException;
|
||||||
import java.io.IOException;
|
import java.io.IOException;
|
||||||
|
import java.util.ArrayList;
|
||||||
|
import java.util.Collections;
|
||||||
|
import java.util.List;
|
||||||
|
import java.util.Map;
|
||||||
|
import java.util.NavigableMap;
|
||||||
|
import java.util.TreeMap;
|
||||||
|
import java.util.concurrent.ForkJoinPool;
|
||||||
|
import java.util.regex.Pattern;
|
||||||
import java.util.zip.CRC32;
|
import java.util.zip.CRC32;
|
||||||
import org.apache.hadoop.fs.FSDataInputStream;
|
import org.apache.hadoop.fs.FSDataInputStream;
|
||||||
import org.apache.hadoop.fs.FSDataOutputStream;
|
import org.apache.hadoop.fs.FSDataOutputStream;
|
||||||
|
import org.apache.hadoop.fs.FileStatus;
|
||||||
import org.apache.hadoop.fs.FileSystem;
|
import org.apache.hadoop.fs.FileSystem;
|
||||||
import org.apache.hadoop.fs.Path;
|
import org.apache.hadoop.fs.Path;
|
||||||
|
import org.apache.hadoop.hbase.DoNotRetryIOException;
|
||||||
import org.apache.hadoop.hbase.regionserver.StoreContext;
|
import org.apache.hadoop.hbase.regionserver.StoreContext;
|
||||||
import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
|
import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
|
||||||
import org.apache.yetus.audience.InterfaceAudience;
|
import org.apache.yetus.audience.InterfaceAudience;
|
||||||
import org.slf4j.Logger;
|
import org.slf4j.Logger;
|
||||||
import org.slf4j.LoggerFactory;
|
import org.slf4j.LoggerFactory;
|
||||||
|
|
||||||
|
import org.apache.hbase.thirdparty.com.google.common.base.Splitter;
|
||||||
|
|
||||||
import org.apache.hadoop.hbase.shaded.protobuf.generated.StoreFileTrackerProtos.StoreFileList;
|
import org.apache.hadoop.hbase.shaded.protobuf.generated.StoreFileTrackerProtos.StoreFileList;
|
||||||
|
|
||||||
/**
|
/**
|
||||||
|
@ -55,9 +67,13 @@ class StoreFileListFile {
|
||||||
|
|
||||||
static final String TRACK_FILE_DIR = ".filelist";
|
static final String TRACK_FILE_DIR = ".filelist";
|
||||||
|
|
||||||
private static final String TRACK_FILE = "f1";
|
private static final String TRACK_FILE_PREFIX = "f1";
|
||||||
|
|
||||||
private static final String TRACK_FILE_ROTATE = "f2";
|
private static final String TRACK_FILE_ROTATE_PREFIX = "f2";
|
||||||
|
|
||||||
|
private static final char TRACK_FILE_SEPARATOR = '.';
|
||||||
|
|
||||||
|
private static final Pattern TRACK_FILE_PATTERN = Pattern.compile("^f(1|2)\\.\\d+$");
|
||||||
|
|
||||||
// 16 MB, which is big enough for a tracker file
|
// 16 MB, which is big enough for a tracker file
|
||||||
private static final int MAX_FILE_SIZE = 16 * 1024 * 1024;
|
private static final int MAX_FILE_SIZE = 16 * 1024 * 1024;
|
||||||
|
@ -76,8 +92,6 @@ class StoreFileListFile {
|
||||||
StoreFileListFile(StoreContext ctx) {
|
StoreFileListFile(StoreContext ctx) {
|
||||||
this.ctx = ctx;
|
this.ctx = ctx;
|
||||||
trackFileDir = new Path(ctx.getFamilyStoreDirectoryPath(), TRACK_FILE_DIR);
|
trackFileDir = new Path(ctx.getFamilyStoreDirectoryPath(), TRACK_FILE_DIR);
|
||||||
trackFiles[0] = new Path(trackFileDir, TRACK_FILE);
|
|
||||||
trackFiles[1] = new Path(trackFileDir, TRACK_FILE_ROTATE);
|
|
||||||
}
|
}
|
||||||
|
|
||||||
private StoreFileList load(Path path) throws IOException {
|
private StoreFileList load(Path path) throws IOException {
|
||||||
|
@ -114,23 +128,103 @@ class StoreFileListFile {
|
||||||
return lists[0].getTimestamp() >= lists[1].getTimestamp() ? 0 : 1;
|
return lists[0].getTimestamp() >= lists[1].getTimestamp() ? 0 : 1;
|
||||||
}
|
}
|
||||||
|
|
||||||
StoreFileList load() throws IOException {
|
// file sequence id to path
|
||||||
|
private NavigableMap<Long, List<Path>> listFiles() throws IOException {
|
||||||
|
FileSystem fs = ctx.getRegionFileSystem().getFileSystem();
|
||||||
|
FileStatus[] statuses;
|
||||||
|
try {
|
||||||
|
statuses = fs.listStatus(trackFileDir);
|
||||||
|
} catch (FileNotFoundException e) {
|
||||||
|
LOG.debug("Track file directory {} does not exist", trackFileDir, e);
|
||||||
|
return Collections.emptyNavigableMap();
|
||||||
|
}
|
||||||
|
if (statuses == null || statuses.length == 0) {
|
||||||
|
return Collections.emptyNavigableMap();
|
||||||
|
}
|
||||||
|
TreeMap<Long, List<Path>> map = new TreeMap<>((l1, l2) -> l2.compareTo(l1));
|
||||||
|
for (FileStatus status : statuses) {
|
||||||
|
Path file = status.getPath();
|
||||||
|
if (!status.isFile()) {
|
||||||
|
LOG.warn("Found invalid track file {}, which is not a file", file);
|
||||||
|
continue;
|
||||||
|
}
|
||||||
|
if (!TRACK_FILE_PATTERN.matcher(file.getName()).matches()) {
|
||||||
|
LOG.warn("Found invalid track file {}, skip", file);
|
||||||
|
continue;
|
||||||
|
}
|
||||||
|
List<String> parts = Splitter.on(TRACK_FILE_SEPARATOR).splitToList(file.getName());
|
||||||
|
map.computeIfAbsent(Long.parseLong(parts.get(1)), k -> new ArrayList<>()).add(file);
|
||||||
|
}
|
||||||
|
return map;
|
||||||
|
}
|
||||||
|
|
||||||
|
private void initializeTrackFiles(long seqId) {
|
||||||
|
trackFiles[0] = new Path(trackFileDir, TRACK_FILE_PREFIX + TRACK_FILE_SEPARATOR + seqId);
|
||||||
|
trackFiles[1] = new Path(trackFileDir, TRACK_FILE_ROTATE_PREFIX + TRACK_FILE_SEPARATOR + seqId);
|
||||||
|
LOG.info("Initialized track files: {}, {}", trackFiles[0], trackFiles[1]);
|
||||||
|
}
|
||||||
|
|
||||||
|
private void cleanUpTrackFiles(long loadedSeqId,
|
||||||
|
NavigableMap<Long, List<Path>> seqId2TrackFiles) {
|
||||||
|
LOG.info("Cleanup track file with sequence id < {}", loadedSeqId);
|
||||||
|
FileSystem fs = ctx.getRegionFileSystem().getFileSystem();
|
||||||
|
NavigableMap<Long, List<Path>> toDelete =
|
||||||
|
loadedSeqId >= 0 ? seqId2TrackFiles.tailMap(loadedSeqId, false) : seqId2TrackFiles;
|
||||||
|
toDelete.values().stream().flatMap(l -> l.stream()).forEach(file -> {
|
||||||
|
ForkJoinPool.commonPool().execute(() -> {
|
||||||
|
LOG.info("Deleting track file {}", file);
|
||||||
|
try {
|
||||||
|
fs.delete(file, false);
|
||||||
|
} catch (IOException e) {
|
||||||
|
LOG.warn("failed to delete unused track file {}", file, e);
|
||||||
|
}
|
||||||
|
});
|
||||||
|
});
|
||||||
|
}
|
||||||
|
|
||||||
|
StoreFileList load(boolean readOnly) throws IOException {
|
||||||
|
NavigableMap<Long, List<Path>> seqId2TrackFiles = listFiles();
|
||||||
|
long seqId = -1L;
|
||||||
StoreFileList[] lists = new StoreFileList[2];
|
StoreFileList[] lists = new StoreFileList[2];
|
||||||
for (int i = 0; i < 2; i++) {
|
for (Map.Entry<Long, List<Path>> entry : seqId2TrackFiles.entrySet()) {
|
||||||
try {
|
List<Path> files = entry.getValue();
|
||||||
lists[i] = load(trackFiles[i]);
|
// should not have more than 2 files, if not, it means that the track files are broken, just
|
||||||
} catch (FileNotFoundException | EOFException e) {
|
// throw exception out and fail the region open.
|
||||||
// this is normal case, so use info and do not log stacktrace
|
if (files.size() > 2) {
|
||||||
LOG.info("Failed to load track file {}: {}", trackFiles[i], e.toString());
|
throw new DoNotRetryIOException("Should only have at most 2 track files for sequence id " +
|
||||||
|
entry.getKey() + ", but got " + files.size() + " files: " + files);
|
||||||
|
}
|
||||||
|
boolean loaded = false;
|
||||||
|
for (int i = 0; i < files.size(); i++) {
|
||||||
|
try {
|
||||||
|
lists[i] = load(files.get(i));
|
||||||
|
loaded = true;
|
||||||
|
} catch (EOFException e) {
|
||||||
|
// this is normal case, so use info and do not log stacktrace
|
||||||
|
LOG.info("Failed to load track file {}: {}", trackFiles[i], e.toString());
|
||||||
|
}
|
||||||
|
}
|
||||||
|
if (loaded) {
|
||||||
|
seqId = entry.getKey();
|
||||||
|
break;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
int winnerIndex = select(lists);
|
if (readOnly) {
|
||||||
if (lists[winnerIndex] != null) {
|
return lists[select(lists)];
|
||||||
nextTrackFile = 1 - winnerIndex;
|
|
||||||
prevTimestamp = lists[winnerIndex].getTimestamp();
|
|
||||||
} else {
|
|
||||||
nextTrackFile = 0;
|
|
||||||
}
|
}
|
||||||
|
|
||||||
|
cleanUpTrackFiles(seqId, seqId2TrackFiles);
|
||||||
|
|
||||||
|
if (seqId < 0) {
|
||||||
|
initializeTrackFiles(System.currentTimeMillis());
|
||||||
|
nextTrackFile = 0;
|
||||||
|
return null;
|
||||||
|
}
|
||||||
|
|
||||||
|
initializeTrackFiles(Math.max(System.currentTimeMillis(), seqId + 1));
|
||||||
|
int winnerIndex = select(lists);
|
||||||
|
nextTrackFile = 1 - winnerIndex;
|
||||||
|
prevTimestamp = lists[winnerIndex].getTimestamp();
|
||||||
return lists[winnerIndex];
|
return lists[winnerIndex];
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -140,7 +234,8 @@ class StoreFileListFile {
|
||||||
void update(StoreFileList.Builder builder) throws IOException {
|
void update(StoreFileList.Builder builder) throws IOException {
|
||||||
if (nextTrackFile < 0) {
|
if (nextTrackFile < 0) {
|
||||||
// we need to call load first to load the prevTimestamp and also the next file
|
// we need to call load first to load the prevTimestamp and also the next file
|
||||||
load();
|
// we are already in the update method, which is not read only, so pass false
|
||||||
|
load(false);
|
||||||
}
|
}
|
||||||
long timestamp = Math.max(prevTimestamp + 1, EnvironmentEdgeManager.currentTime());
|
long timestamp = Math.max(prevTimestamp + 1, EnvironmentEdgeManager.currentTime());
|
||||||
byte[] actualData = builder.setTimestamp(timestamp).build().toByteArray();
|
byte[] actualData = builder.setTimestamp(timestamp).build().toByteArray();
|
||||||
|
@ -162,7 +257,7 @@ class StoreFileListFile {
|
||||||
fs.delete(trackFiles[nextTrackFile], false);
|
fs.delete(trackFiles[nextTrackFile], false);
|
||||||
} catch (IOException e) {
|
} catch (IOException e) {
|
||||||
// we will create new file with overwrite = true, so not a big deal here, only for speed up
|
// we will create new file with overwrite = true, so not a big deal here, only for speed up
|
||||||
// loading as we do not need to read this file when loading(we will hit FileNotFoundException)
|
// loading as we do not need to read this file when loading
|
||||||
LOG.debug("failed to delete old track file {}, not a big deal, just ignore", e);
|
LOG.debug("failed to delete old track file {}, not a big deal, just ignore", e);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -21,7 +21,7 @@ import static org.apache.hadoop.hbase.regionserver.storefiletracker.StoreFileTra
|
||||||
|
|
||||||
import java.io.IOException;
|
import java.io.IOException;
|
||||||
import java.util.Collection;
|
import java.util.Collection;
|
||||||
|
import java.util.List;
|
||||||
import org.apache.hadoop.conf.Configuration;
|
import org.apache.hadoop.conf.Configuration;
|
||||||
import org.apache.hadoop.fs.Path;
|
import org.apache.hadoop.fs.Path;
|
||||||
import org.apache.hadoop.hbase.client.ColumnFamilyDescriptor;
|
import org.apache.hadoop.hbase.client.ColumnFamilyDescriptor;
|
||||||
|
@ -66,6 +66,11 @@ abstract class StoreFileTrackerBase implements StoreFileTracker {
|
||||||
this.ctx = ctx;
|
this.ctx = ctx;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public final List<StoreFileInfo> load() throws IOException {
|
||||||
|
return doLoadStoreFiles(!isPrimaryReplica);
|
||||||
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public final void add(Collection<StoreFileInfo> newFiles) throws IOException {
|
public final void add(Collection<StoreFileInfo> newFiles) throws IOException {
|
||||||
if (isPrimaryReplica) {
|
if (isPrimaryReplica) {
|
||||||
|
@ -81,6 +86,13 @@ abstract class StoreFileTrackerBase implements StoreFileTracker {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public final void set(List<StoreFileInfo> files) throws IOException {
|
||||||
|
if (isPrimaryReplica) {
|
||||||
|
doSetStoreFiles(files);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public TableDescriptorBuilder updateWithTrackerConfigs(TableDescriptorBuilder builder) {
|
public TableDescriptorBuilder updateWithTrackerConfigs(TableDescriptorBuilder builder) {
|
||||||
builder.setValue(TRACKER_IMPL, getTrackerName());
|
builder.setValue(TRACKER_IMPL, getTrackerName());
|
||||||
|
@ -173,8 +185,19 @@ abstract class StoreFileTrackerBase implements StoreFileTracker {
|
||||||
return builder.build();
|
return builder.build();
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* For primary replica, we will call load once when opening a region, and the implementation could
|
||||||
|
* choose to do some cleanup work. So here we use {@code readOnly} to indicate that whether you
|
||||||
|
* are allowed to do the cleanup work. For secondary replicas, we will set {@code readOnly} to
|
||||||
|
* {@code true}.
|
||||||
|
*/
|
||||||
|
protected abstract List<StoreFileInfo> doLoadStoreFiles(boolean readOnly) throws IOException;
|
||||||
|
|
||||||
protected abstract void doAddNewStoreFiles(Collection<StoreFileInfo> newFiles) throws IOException;
|
protected abstract void doAddNewStoreFiles(Collection<StoreFileInfo> newFiles) throws IOException;
|
||||||
|
|
||||||
protected abstract void doAddCompactionResults(Collection<StoreFileInfo> compactedFiles,
|
protected abstract void doAddCompactionResults(Collection<StoreFileInfo> compactedFiles,
|
||||||
Collection<StoreFileInfo> newFiles) throws IOException;
|
Collection<StoreFileInfo> newFiles) throws IOException;
|
||||||
|
|
||||||
|
protected abstract void doSetStoreFiles(Collection<StoreFileInfo> files) throws IOException;
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|
|
@ -57,7 +57,7 @@ public class StoreFileTrackerForTest extends DefaultStoreFileTracker {
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public List<StoreFileInfo> load() throws IOException {
|
protected List<StoreFileInfo> doLoadStoreFiles(boolean readOnly) throws IOException {
|
||||||
return new ArrayList<>(trackedFiles.get(storeId));
|
return new ArrayList<>(trackedFiles.get(storeId));
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -17,8 +17,10 @@
|
||||||
*/
|
*/
|
||||||
package org.apache.hadoop.hbase.regionserver.storefiletracker;
|
package org.apache.hadoop.hbase.regionserver.storefiletracker;
|
||||||
|
|
||||||
|
import static org.junit.Assert.assertEquals;
|
||||||
import static org.junit.Assert.assertNull;
|
import static org.junit.Assert.assertNull;
|
||||||
import static org.junit.Assert.assertThrows;
|
import static org.junit.Assert.assertThrows;
|
||||||
|
import static org.junit.Assert.assertTrue;
|
||||||
import static org.mockito.Mockito.mock;
|
import static org.mockito.Mockito.mock;
|
||||||
import static org.mockito.Mockito.when;
|
import static org.mockito.Mockito.when;
|
||||||
|
|
||||||
|
@ -47,6 +49,7 @@ import org.slf4j.LoggerFactory;
|
||||||
|
|
||||||
import org.apache.hbase.thirdparty.com.google.common.io.ByteStreams;
|
import org.apache.hbase.thirdparty.com.google.common.io.ByteStreams;
|
||||||
|
|
||||||
|
import org.apache.hadoop.hbase.shaded.protobuf.generated.StoreFileTrackerProtos.StoreFileEntry;
|
||||||
import org.apache.hadoop.hbase.shaded.protobuf.generated.StoreFileTrackerProtos.StoreFileList;
|
import org.apache.hadoop.hbase.shaded.protobuf.generated.StoreFileTrackerProtos.StoreFileList;
|
||||||
|
|
||||||
@Category({ RegionServerTests.class, SmallTests.class })
|
@Category({ RegionServerTests.class, SmallTests.class })
|
||||||
|
@ -67,14 +70,18 @@ public class TestStoreFileListFile {
|
||||||
@Rule
|
@Rule
|
||||||
public TestName name = new TestName();
|
public TestName name = new TestName();
|
||||||
|
|
||||||
@Before
|
private StoreFileListFile create() throws IOException {
|
||||||
public void setUp() throws IOException {
|
|
||||||
testDir = UTIL.getDataTestDir(name.getMethodName());
|
|
||||||
HRegionFileSystem hfs = mock(HRegionFileSystem.class);
|
HRegionFileSystem hfs = mock(HRegionFileSystem.class);
|
||||||
when(hfs.getFileSystem()).thenReturn(FileSystem.get(UTIL.getConfiguration()));
|
when(hfs.getFileSystem()).thenReturn(FileSystem.get(UTIL.getConfiguration()));
|
||||||
StoreContext ctx = StoreContext.getBuilder().withFamilyStoreDirectoryPath(testDir)
|
StoreContext ctx = StoreContext.getBuilder().withFamilyStoreDirectoryPath(testDir)
|
||||||
.withRegionFileSystem(hfs).build();
|
.withRegionFileSystem(hfs).build();
|
||||||
storeFileListFile = new StoreFileListFile(ctx);
|
return new StoreFileListFile(ctx);
|
||||||
|
}
|
||||||
|
|
||||||
|
@Before
|
||||||
|
public void setUp() throws IOException {
|
||||||
|
testDir = UTIL.getDataTestDir(name.getMethodName());
|
||||||
|
storeFileListFile = create();
|
||||||
}
|
}
|
||||||
|
|
||||||
@AfterClass
|
@AfterClass
|
||||||
|
@ -84,7 +91,7 @@ public class TestStoreFileListFile {
|
||||||
|
|
||||||
@Test
|
@Test
|
||||||
public void testEmptyLoad() throws IOException {
|
public void testEmptyLoad() throws IOException {
|
||||||
assertNull(storeFileListFile.load());
|
assertNull(storeFileListFile.load(false));
|
||||||
}
|
}
|
||||||
|
|
||||||
private FileStatus getOnlyTrackerFile(FileSystem fs) throws IOException {
|
private FileStatus getOnlyTrackerFile(FileSystem fs) throws IOException {
|
||||||
|
@ -114,7 +121,7 @@ public class TestStoreFileListFile {
|
||||||
trackerFileStatus.getLen(), trackerFileStatus.getLen() / 2);
|
trackerFileStatus.getLen(), trackerFileStatus.getLen() / 2);
|
||||||
byte[] content = readAll(fs, trackerFileStatus.getPath());
|
byte[] content = readAll(fs, trackerFileStatus.getPath());
|
||||||
write(fs, trackerFileStatus.getPath(), content, 0, content.length / 2);
|
write(fs, trackerFileStatus.getPath(), content, 0, content.length / 2);
|
||||||
assertNull(storeFileListFile.load());
|
assertNull(storeFileListFile.load(false));
|
||||||
}
|
}
|
||||||
|
|
||||||
private void writeInt(byte[] buf, int off, int value) {
|
private void writeInt(byte[] buf, int off, int value) {
|
||||||
|
@ -134,7 +141,7 @@ public class TestStoreFileListFile {
|
||||||
byte[] content = readAll(fs, trackerFileStatus.getPath());
|
byte[] content = readAll(fs, trackerFileStatus.getPath());
|
||||||
writeInt(content, 0, 0);
|
writeInt(content, 0, 0);
|
||||||
write(fs, trackerFileStatus.getPath(), content, 0, content.length);
|
write(fs, trackerFileStatus.getPath(), content, 0, content.length);
|
||||||
assertThrows(IOException.class, () -> storeFileListFile.load());
|
assertThrows(IOException.class, () -> storeFileListFile.load(false));
|
||||||
}
|
}
|
||||||
|
|
||||||
@Test
|
@Test
|
||||||
|
@ -147,7 +154,7 @@ public class TestStoreFileListFile {
|
||||||
byte[] content = readAll(fs, trackerFileStatus.getPath());
|
byte[] content = readAll(fs, trackerFileStatus.getPath());
|
||||||
writeInt(content, 0, 128 * 1024 * 1024);
|
writeInt(content, 0, 128 * 1024 * 1024);
|
||||||
write(fs, trackerFileStatus.getPath(), content, 0, content.length);
|
write(fs, trackerFileStatus.getPath(), content, 0, content.length);
|
||||||
assertThrows(IOException.class, () -> storeFileListFile.load());
|
assertThrows(IOException.class, () -> storeFileListFile.load(false));
|
||||||
}
|
}
|
||||||
|
|
||||||
@Test
|
@Test
|
||||||
|
@ -160,6 +167,59 @@ public class TestStoreFileListFile {
|
||||||
byte[] content = readAll(fs, trackerFileStatus.getPath());
|
byte[] content = readAll(fs, trackerFileStatus.getPath());
|
||||||
content[5] = (byte) ~content[5];
|
content[5] = (byte) ~content[5];
|
||||||
write(fs, trackerFileStatus.getPath(), content, 0, content.length);
|
write(fs, trackerFileStatus.getPath(), content, 0, content.length);
|
||||||
assertThrows(IOException.class, () -> storeFileListFile.load());
|
assertThrows(IOException.class, () -> storeFileListFile.load(false));
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void testLoadNewerTrackFiles() throws IOException, InterruptedException {
|
||||||
|
StoreFileList.Builder builder = StoreFileList.newBuilder();
|
||||||
|
storeFileListFile.update(builder);
|
||||||
|
|
||||||
|
FileSystem fs = FileSystem.get(UTIL.getConfiguration());
|
||||||
|
FileStatus trackFileStatus = getOnlyTrackerFile(fs);
|
||||||
|
|
||||||
|
builder.addStoreFile(StoreFileEntry.newBuilder().setName("hehe").setSize(10).build());
|
||||||
|
storeFileListFile = create();
|
||||||
|
storeFileListFile.update(builder);
|
||||||
|
|
||||||
|
// should load the list we stored the second time
|
||||||
|
storeFileListFile = create();
|
||||||
|
StoreFileList list = storeFileListFile.load(true);
|
||||||
|
assertEquals(1, list.getStoreFileCount());
|
||||||
|
// since read only is true, we should not delete the old track file
|
||||||
|
// the deletion is in background, so we will test it multiple times through HTU.waitFor and make
|
||||||
|
// sure that it is still there after timeout, i.e, the waitFor method returns -1
|
||||||
|
assertTrue(UTIL.waitFor(2000, 100, false, () -> !fs.exists(testDir)) < 0);
|
||||||
|
|
||||||
|
// this time read only is false, we should delete the old track file
|
||||||
|
list = storeFileListFile.load(false);
|
||||||
|
assertEquals(1, list.getStoreFileCount());
|
||||||
|
UTIL.waitFor(5000, () -> !fs.exists(trackFileStatus.getPath()));
|
||||||
|
}
|
||||||
|
|
||||||
|
// This is to simulate the scenario where a 'dead' RS perform flush or compaction on a region
|
||||||
|
// which has already been reassigned to another RS. This is possible in real world, usually caused
|
||||||
|
// by a long STW GC.
|
||||||
|
@Test
|
||||||
|
public void testConcurrentUpdate() throws IOException {
|
||||||
|
storeFileListFile.update(StoreFileList.newBuilder());
|
||||||
|
|
||||||
|
StoreFileListFile storeFileListFile2 = create();
|
||||||
|
storeFileListFile2.update(StoreFileList.newBuilder()
|
||||||
|
.addStoreFile(StoreFileEntry.newBuilder().setName("hehe").setSize(10).build()));
|
||||||
|
|
||||||
|
// let's update storeFileListFile several times
|
||||||
|
for (int i = 0; i < 10; i++) {
|
||||||
|
storeFileListFile.update(StoreFileList.newBuilder()
|
||||||
|
.addStoreFile(StoreFileEntry.newBuilder().setName("haha-" + i).setSize(100 + i).build()));
|
||||||
|
}
|
||||||
|
|
||||||
|
// create a new list file, make sure we load the list generate by storeFileListFile2.
|
||||||
|
StoreFileListFile storeFileListFile3 = create();
|
||||||
|
StoreFileList fileList = storeFileListFile3.load(true);
|
||||||
|
assertEquals(1, fileList.getStoreFileCount());
|
||||||
|
StoreFileEntry entry = fileList.getStoreFile(0);
|
||||||
|
assertEquals("hehe", entry.getName());
|
||||||
|
assertEquals(10, entry.getSize());
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
Loading…
Reference in New Issue