HBASE-26079 Use StoreFileTracker when splitting and merging (#3617)
Signed-off-by: Duo Zhang <zhangduo@apache.org>
This commit is contained in:
parent
43b40e9374
commit
6e053765e8
|
@ -24,7 +24,6 @@ import java.util.Collection;
|
|||
import java.util.Collections;
|
||||
import java.util.List;
|
||||
import java.util.stream.Stream;
|
||||
|
||||
import org.apache.hadoop.conf.Configuration;
|
||||
import org.apache.hadoop.fs.FileSystem;
|
||||
import org.apache.hadoop.fs.Path;
|
||||
|
@ -56,6 +55,8 @@ import org.apache.hadoop.hbase.regionserver.HRegionFileSystem;
|
|||
import org.apache.hadoop.hbase.regionserver.HStoreFile;
|
||||
import org.apache.hadoop.hbase.regionserver.StoreFileInfo;
|
||||
import org.apache.hadoop.hbase.regionserver.StoreUtils;
|
||||
import org.apache.hadoop.hbase.regionserver.storefiletracker.StoreFileTracker;
|
||||
import org.apache.hadoop.hbase.regionserver.storefiletracker.StoreFileTrackerFactory;
|
||||
import org.apache.hadoop.hbase.util.Bytes;
|
||||
import org.apache.hadoop.hbase.util.CommonFSUtils;
|
||||
import org.apache.hadoop.hbase.wal.WALSplitUtil;
|
||||
|
@ -587,30 +588,35 @@ public class MergeTableRegionsProcedure
|
|||
final MasterFileSystem mfs = env.getMasterServices().getMasterFileSystem();
|
||||
final Path tableDir = CommonFSUtils.getTableDir(mfs.getRootDir(), regionsToMerge[0].getTable());
|
||||
final FileSystem fs = mfs.getFileSystem();
|
||||
|
||||
List<Path> mergedFiles = new ArrayList<>();
|
||||
HRegionFileSystem mergeRegionFs = HRegionFileSystem.createRegionOnFileSystem(
|
||||
env.getMasterConfiguration(), fs, tableDir, mergedRegion);
|
||||
|
||||
for (RegionInfo ri: this.regionsToMerge) {
|
||||
HRegionFileSystem regionFs = HRegionFileSystem.openRegionFromFileSystem(
|
||||
env.getMasterConfiguration(), fs, tableDir, ri, false);
|
||||
mergeStoreFiles(env, regionFs, mergeRegionFs, mergedRegion);
|
||||
mergedFiles.addAll(mergeStoreFiles(env, regionFs, mergeRegionFs, mergedRegion));
|
||||
}
|
||||
assert mergeRegionFs != null;
|
||||
mergeRegionFs.commitMergedRegion();
|
||||
mergeRegionFs.commitMergedRegion(mergedFiles, env);
|
||||
|
||||
// Prepare to create merged regions
|
||||
env.getAssignmentManager().getRegionStates().
|
||||
getOrCreateRegionStateNode(mergedRegion).setState(State.MERGING_NEW);
|
||||
}
|
||||
|
||||
private void mergeStoreFiles(MasterProcedureEnv env, HRegionFileSystem regionFs,
|
||||
private List<Path> mergeStoreFiles(MasterProcedureEnv env, HRegionFileSystem regionFs,
|
||||
HRegionFileSystem mergeRegionFs, RegionInfo mergedRegion) throws IOException {
|
||||
final TableDescriptor htd = env.getMasterServices().getTableDescriptors()
|
||||
.get(mergedRegion.getTable());
|
||||
List<Path> mergedFiles = new ArrayList<>();
|
||||
for (ColumnFamilyDescriptor hcd : htd.getColumnFamilies()) {
|
||||
String family = hcd.getNameAsString();
|
||||
final Collection<StoreFileInfo> storeFiles = regionFs.getStoreFiles(family);
|
||||
Configuration trackerConfig =
|
||||
StoreFileTrackerFactory.mergeConfigurations(env.getMasterConfiguration(), htd, hcd);
|
||||
StoreFileTracker tracker = StoreFileTrackerFactory.create(trackerConfig, true,
|
||||
family, regionFs);
|
||||
final Collection<StoreFileInfo> storeFiles = tracker.load();
|
||||
if (storeFiles != null && storeFiles.size() > 0) {
|
||||
final Configuration storeConfiguration =
|
||||
StoreUtils.createStoreConfiguration(env.getMasterConfiguration(), htd, hcd);
|
||||
|
@ -622,11 +628,13 @@ public class MergeTableRegionsProcedure
|
|||
// is running in a regionserver's Store context, or we might not be able
|
||||
// to read the hfiles.
|
||||
storeFileInfo.setConf(storeConfiguration);
|
||||
mergeRegionFs.mergeStoreFile(regionFs.getRegionInfo(), family,
|
||||
Path refFile = mergeRegionFs.mergeStoreFile(regionFs.getRegionInfo(), family,
|
||||
new HStoreFile(storeFileInfo, hcd.getBloomFilterType(), CacheConfig.DISABLED));
|
||||
mergedFiles.add(refFile);
|
||||
}
|
||||
}
|
||||
}
|
||||
return mergedFiles;
|
||||
}
|
||||
|
||||
/**
|
||||
|
|
|
@ -33,7 +33,6 @@ import java.util.concurrent.Executors;
|
|||
import java.util.concurrent.Future;
|
||||
import java.util.concurrent.TimeUnit;
|
||||
import java.util.stream.Stream;
|
||||
|
||||
import org.apache.hadoop.conf.Configuration;
|
||||
import org.apache.hadoop.fs.FileSystem;
|
||||
import org.apache.hadoop.fs.Path;
|
||||
|
@ -66,6 +65,8 @@ import org.apache.hadoop.hbase.regionserver.RegionSplitPolicy;
|
|||
import org.apache.hadoop.hbase.regionserver.RegionSplitRestriction;
|
||||
import org.apache.hadoop.hbase.regionserver.StoreFileInfo;
|
||||
import org.apache.hadoop.hbase.regionserver.StoreUtils;
|
||||
import org.apache.hadoop.hbase.regionserver.storefiletracker.StoreFileTracker;
|
||||
import org.apache.hadoop.hbase.regionserver.storefiletracker.StoreFileTrackerFactory;
|
||||
import org.apache.hadoop.hbase.util.Bytes;
|
||||
import org.apache.hadoop.hbase.util.CommonFSUtils;
|
||||
import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
|
||||
|
@ -621,21 +622,20 @@ public class SplitTableRegionProcedure
|
|||
final FileSystem fs = mfs.getFileSystem();
|
||||
HRegionFileSystem regionFs = HRegionFileSystem.openRegionFromFileSystem(
|
||||
env.getMasterConfiguration(), fs, tabledir, getParentRegion(), false);
|
||||
|
||||
regionFs.createSplitsDir(daughterOneRI, daughterTwoRI);
|
||||
|
||||
Pair<Integer, Integer> expectedReferences = splitStoreFiles(env, regionFs);
|
||||
Pair<List<Path>, List<Path>> expectedReferences = splitStoreFiles(env, regionFs);
|
||||
|
||||
assertSplitResultFilesCount(fs, expectedReferences.getFirst(),
|
||||
assertSplitResultFilesCount(fs, expectedReferences.getFirst().size(),
|
||||
regionFs.getSplitsDir(daughterOneRI));
|
||||
regionFs.commitDaughterRegion(daughterOneRI);
|
||||
assertSplitResultFilesCount(fs, expectedReferences.getFirst(),
|
||||
regionFs.commitDaughterRegion(daughterOneRI, expectedReferences.getFirst(), env);
|
||||
assertSplitResultFilesCount(fs, expectedReferences.getFirst().size(),
|
||||
new Path(tabledir, daughterOneRI.getEncodedName()));
|
||||
|
||||
assertSplitResultFilesCount(fs, expectedReferences.getSecond(),
|
||||
assertSplitResultFilesCount(fs, expectedReferences.getSecond().size(),
|
||||
regionFs.getSplitsDir(daughterTwoRI));
|
||||
regionFs.commitDaughterRegion(daughterTwoRI);
|
||||
assertSplitResultFilesCount(fs, expectedReferences.getSecond(),
|
||||
regionFs.commitDaughterRegion(daughterTwoRI, expectedReferences.getSecond(), env);
|
||||
assertSplitResultFilesCount(fs, expectedReferences.getSecond().size(),
|
||||
new Path(tabledir, daughterTwoRI.getEncodedName()));
|
||||
}
|
||||
|
||||
|
@ -652,7 +652,7 @@ public class SplitTableRegionProcedure
|
|||
* Create Split directory
|
||||
* @param env MasterProcedureEnv
|
||||
*/
|
||||
private Pair<Integer, Integer> splitStoreFiles(final MasterProcedureEnv env,
|
||||
private Pair<List<Path>, List<Path>> splitStoreFiles(final MasterProcedureEnv env,
|
||||
final HRegionFileSystem regionFs) throws IOException {
|
||||
final Configuration conf = env.getMasterConfiguration();
|
||||
TableDescriptor htd = env.getMasterServices().getTableDescriptors().get(getTableName());
|
||||
|
@ -668,7 +668,11 @@ public class SplitTableRegionProcedure
|
|||
new HashMap<String, Collection<StoreFileInfo>>(htd.getColumnFamilyCount());
|
||||
for (ColumnFamilyDescriptor cfd : htd.getColumnFamilies()) {
|
||||
String family = cfd.getNameAsString();
|
||||
Collection<StoreFileInfo> sfis = regionFs.getStoreFiles(family);
|
||||
Configuration trackerConfig = StoreFileTrackerFactory.
|
||||
mergeConfigurations(env.getMasterConfiguration(), htd, htd.getColumnFamily(cfd.getName()));
|
||||
StoreFileTracker tracker = StoreFileTrackerFactory.create(trackerConfig, true,
|
||||
family, regionFs);
|
||||
Collection<StoreFileInfo> sfis = tracker.load();
|
||||
if (sfis == null) {
|
||||
continue;
|
||||
}
|
||||
|
@ -694,7 +698,7 @@ public class SplitTableRegionProcedure
|
|||
}
|
||||
if (nbFiles == 0) {
|
||||
// no file needs to be splitted.
|
||||
return new Pair<Integer, Integer>(0, 0);
|
||||
return new Pair<>(Collections.emptyList(), Collections.emptyList());
|
||||
}
|
||||
// Max #threads is the smaller of the number of storefiles or the default max determined above.
|
||||
int maxThreads = Math.min(
|
||||
|
@ -752,14 +756,18 @@ public class SplitTableRegionProcedure
|
|||
throw (InterruptedIOException) new InterruptedIOException().initCause(e);
|
||||
}
|
||||
|
||||
int daughterA = 0;
|
||||
int daughterB = 0;
|
||||
List<Path> daughterA = new ArrayList<>();
|
||||
List<Path> daughterB = new ArrayList<>();
|
||||
// Look for any exception
|
||||
for (Future<Pair<Path, Path>> future : futures) {
|
||||
try {
|
||||
Pair<Path, Path> p = future.get();
|
||||
daughterA += p.getFirst() != null ? 1 : 0;
|
||||
daughterB += p.getSecond() != null ? 1 : 0;
|
||||
if(p.getFirst() != null){
|
||||
daughterA.add(p.getFirst());
|
||||
}
|
||||
if(p.getSecond() != null){
|
||||
daughterB.add(p.getSecond());
|
||||
}
|
||||
} catch (InterruptedException e) {
|
||||
throw (InterruptedIOException) new InterruptedIOException().initCause(e);
|
||||
} catch (ExecutionException e) {
|
||||
|
@ -772,7 +780,7 @@ public class SplitTableRegionProcedure
|
|||
getParentRegion().getShortNameToLog() + " Daughter A: " + daughterA +
|
||||
" storefiles, Daughter B: " + daughterB + " storefiles.");
|
||||
}
|
||||
return new Pair<Integer, Integer>(daughterA, daughterB);
|
||||
return new Pair<>(daughterA, daughterB);
|
||||
}
|
||||
|
||||
private void assertSplitResultFilesCount(final FileSystem fs,
|
||||
|
|
|
@ -24,7 +24,9 @@ import java.io.IOException;
|
|||
import java.io.InterruptedIOException;
|
||||
import java.util.ArrayList;
|
||||
import java.util.Collection;
|
||||
import java.util.HashMap;
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
import java.util.Objects;
|
||||
import java.util.Optional;
|
||||
import java.util.UUID;
|
||||
|
@ -49,6 +51,9 @@ import org.apache.hadoop.hbase.client.TableDescriptor;
|
|||
import org.apache.hadoop.hbase.fs.HFileSystem;
|
||||
import org.apache.hadoop.hbase.io.HFileLink;
|
||||
import org.apache.hadoop.hbase.io.Reference;
|
||||
import org.apache.hadoop.hbase.master.procedure.MasterProcedureEnv;
|
||||
import org.apache.hadoop.hbase.regionserver.storefiletracker.StoreFileTracker;
|
||||
import org.apache.hadoop.hbase.regionserver.storefiletracker.StoreFileTrackerFactory;
|
||||
import org.apache.hadoop.hbase.util.Bytes;
|
||||
import org.apache.hadoop.hbase.util.CommonFSUtils;
|
||||
import org.apache.hadoop.hbase.util.FSUtils;
|
||||
|
@ -595,17 +600,44 @@ public class HRegionFileSystem {
|
|||
* @param regionInfo daughter {@link org.apache.hadoop.hbase.client.RegionInfo}
|
||||
* @throws IOException
|
||||
*/
|
||||
public Path commitDaughterRegion(final RegionInfo regionInfo)
|
||||
throws IOException {
|
||||
public Path commitDaughterRegion(final RegionInfo regionInfo, List<Path> allRegionFiles,
|
||||
MasterProcedureEnv env) throws IOException {
|
||||
Path regionDir = this.getSplitsDir(regionInfo);
|
||||
if (fs.exists(regionDir)) {
|
||||
// Write HRI to a file in case we need to recover hbase:meta
|
||||
Path regionInfoFile = new Path(regionDir, REGION_INFO_FILE);
|
||||
byte[] regionInfoContent = getRegionInfoFileContent(regionInfo);
|
||||
writeRegionInfoFileContent(conf, fs, regionInfoFile, regionInfoContent);
|
||||
HRegionFileSystem regionFs = HRegionFileSystem.openRegionFromFileSystem(
|
||||
env.getMasterConfiguration(), fs, getTableDir(), regionInfo, false);
|
||||
insertRegionFilesIntoStoreTracker(allRegionFiles, env, regionFs);
|
||||
}
|
||||
return regionDir;
|
||||
}
|
||||
|
||||
return regionDir;
|
||||
private void insertRegionFilesIntoStoreTracker(List<Path> allFiles, MasterProcedureEnv env,
|
||||
HRegionFileSystem regionFs) throws IOException {
|
||||
TableDescriptor tblDesc = env.getMasterServices().getTableDescriptors().
|
||||
get(regionInfo.getTable());
|
||||
//we need to map trackers per store
|
||||
Map<String, StoreFileTracker> trackerMap = new HashMap<>();
|
||||
//we need to map store files per store
|
||||
Map<String, List<StoreFileInfo>> fileInfoMap = new HashMap<>();
|
||||
for(Path file : allFiles) {
|
||||
String familyName = file.getParent().getName();
|
||||
trackerMap.computeIfAbsent(familyName, t -> {
|
||||
Configuration config = StoreFileTrackerFactory.mergeConfigurations(conf, tblDesc,
|
||||
tblDesc.getColumnFamily(Bytes.toBytes(familyName)));
|
||||
return StoreFileTrackerFactory.
|
||||
create(config, true, familyName, regionFs);
|
||||
});
|
||||
fileInfoMap.computeIfAbsent(familyName, l -> new ArrayList<>());
|
||||
List<StoreFileInfo> infos = fileInfoMap.get(familyName);
|
||||
infos.add(new StoreFileInfo(conf, fs, file, true));
|
||||
}
|
||||
for(Map.Entry<String, StoreFileTracker> entry : trackerMap.entrySet()) {
|
||||
entry.getValue().add(fileInfoMap.get(entry.getKey()));
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
|
@ -795,13 +827,15 @@ public class HRegionFileSystem {
|
|||
* Commit a merged region, making it ready for use.
|
||||
* @throws IOException
|
||||
*/
|
||||
public void commitMergedRegion() throws IOException {
|
||||
public void commitMergedRegion(List<Path> allMergedFiles, MasterProcedureEnv env)
|
||||
throws IOException {
|
||||
Path regionDir = getMergesDir(regionInfoForFs);
|
||||
if (regionDir != null && fs.exists(regionDir)) {
|
||||
// Write HRI to a file in case we need to recover hbase:meta
|
||||
Path regionInfoFile = new Path(regionDir, REGION_INFO_FILE);
|
||||
byte[] regionInfoContent = getRegionInfoFileContent(regionInfo);
|
||||
writeRegionInfoFileContent(conf, fs, regionInfoFile, regionInfoContent);
|
||||
insertRegionFilesIntoStoreTracker(allMergedFiles, env, this);
|
||||
}
|
||||
}
|
||||
|
||||
|
|
|
@ -21,6 +21,7 @@ import java.io.IOException;
|
|||
import java.util.Collection;
|
||||
import java.util.List;
|
||||
import org.apache.hadoop.conf.Configuration;
|
||||
|
||||
import org.apache.hadoop.hbase.regionserver.StoreContext;
|
||||
import org.apache.hadoop.hbase.regionserver.StoreFileInfo;
|
||||
import org.apache.yetus.audience.InterfaceAudience;
|
||||
|
@ -32,8 +33,7 @@ import org.apache.yetus.audience.InterfaceAudience;
|
|||
@InterfaceAudience.Private
|
||||
class DefaultStoreFileTracker extends StoreFileTrackerBase {
|
||||
|
||||
public DefaultStoreFileTracker(Configuration conf, boolean isPrimaryReplica,
|
||||
StoreContext ctx) {
|
||||
public DefaultStoreFileTracker(Configuration conf, boolean isPrimaryReplica, StoreContext ctx) {
|
||||
super(conf, isPrimaryReplica, ctx);
|
||||
}
|
||||
|
||||
|
|
|
@ -48,7 +48,6 @@ import org.apache.yetus.audience.InterfaceAudience;
|
|||
*/
|
||||
@InterfaceAudience.Private
|
||||
public interface StoreFileTracker {
|
||||
|
||||
/**
|
||||
* Load the store files list when opening a region.
|
||||
*/
|
||||
|
|
|
@ -18,22 +18,51 @@
|
|||
package org.apache.hadoop.hbase.regionserver.storefiletracker;
|
||||
|
||||
import org.apache.hadoop.conf.Configuration;
|
||||
import org.apache.hadoop.hbase.CompoundConfiguration;
|
||||
import org.apache.hadoop.hbase.client.ColumnFamilyDescriptor;
|
||||
import org.apache.hadoop.hbase.client.ColumnFamilyDescriptorBuilder;
|
||||
import org.apache.hadoop.hbase.client.TableDescriptor;
|
||||
import org.apache.hadoop.hbase.regionserver.HRegionFileSystem;
|
||||
import org.apache.hadoop.hbase.regionserver.StoreContext;
|
||||
import org.apache.hadoop.hbase.util.Bytes;
|
||||
import org.apache.hadoop.hbase.util.ReflectionUtils;
|
||||
import org.apache.yetus.audience.InterfaceAudience;
|
||||
import org.slf4j.Logger;
|
||||
import org.slf4j.LoggerFactory;
|
||||
|
||||
/**
|
||||
* Factory method for creating store file tracker.
|
||||
*/
|
||||
@InterfaceAudience.Private
|
||||
public final class StoreFileTrackerFactory {
|
||||
|
||||
public static final String TRACK_IMPL = "hbase.store.file-tracker.impl";
|
||||
private static final Logger LOG = LoggerFactory.getLogger(StoreFileTrackerFactory.class);
|
||||
|
||||
public static StoreFileTracker create(Configuration conf, boolean isPrimaryReplica,
|
||||
StoreContext ctx) {
|
||||
Class<? extends StoreFileTracker> tracker =
|
||||
conf.getClass(TRACK_IMPL, DefaultStoreFileTracker.class, StoreFileTracker.class);
|
||||
LOG.info("instantiating StoreFileTracker impl {}", tracker.getName());
|
||||
return ReflectionUtils.newInstance(tracker, conf, isPrimaryReplica, ctx);
|
||||
}
|
||||
|
||||
public static StoreFileTracker create(Configuration conf, boolean isPrimaryReplica, String family,
|
||||
HRegionFileSystem regionFs) {
|
||||
ColumnFamilyDescriptorBuilder fDescBuilder =
|
||||
ColumnFamilyDescriptorBuilder.newBuilder(Bytes.toBytes(family));
|
||||
StoreContext ctx = StoreContext.getBuilder().
|
||||
withColumnFamilyDescriptor(fDescBuilder.build()).
|
||||
withRegionFileSystem(regionFs).
|
||||
build();
|
||||
return StoreFileTrackerFactory.create(conf, isPrimaryReplica, ctx);
|
||||
}
|
||||
|
||||
public static Configuration mergeConfigurations(Configuration global,
|
||||
TableDescriptor table, ColumnFamilyDescriptor family) {
|
||||
return new CompoundConfiguration()
|
||||
.add(global)
|
||||
.addBytesMap(table.getValues())
|
||||
.addStringMap(family.getConfiguration())
|
||||
.addBytesMap(family.getValues());
|
||||
}
|
||||
}
|
||||
|
|
|
@ -67,6 +67,7 @@ public class TestDefaultStoreEngine {
|
|||
DummyStoreFlusher.class.getName());
|
||||
HRegion mockRegion = Mockito.mock(HRegion.class);
|
||||
HStore mockStore = Mockito.mock(HStore.class);
|
||||
mockStore.conf = conf;
|
||||
Mockito.when(mockStore.getRegionInfo()).thenReturn(RegionInfoBuilder.FIRST_META_REGIONINFO);
|
||||
Mockito.when(mockStore.getHRegion()).thenReturn(mockRegion);
|
||||
StoreEngine<?, ?, ?, ?> se =
|
||||
|
|
|
@ -21,6 +21,7 @@ import static org.junit.Assert.assertEquals;
|
|||
import static org.junit.Assert.assertTrue;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.util.ArrayList;
|
||||
import java.util.List;
|
||||
|
||||
import org.apache.hadoop.fs.Path;
|
||||
|
@ -32,6 +33,7 @@ import org.apache.hadoop.hbase.client.RegionInfo;
|
|||
import org.apache.hadoop.hbase.client.RegionInfoBuilder;
|
||||
import org.apache.hadoop.hbase.client.Table;
|
||||
import org.apache.hadoop.hbase.master.assignment.SplitTableRegionProcedure;
|
||||
import org.apache.hadoop.hbase.master.procedure.MasterProcedureEnv;
|
||||
import org.apache.hadoop.hbase.procedure2.Procedure;
|
||||
import org.apache.hadoop.hbase.testclassification.LargeTests;
|
||||
import org.apache.hadoop.hbase.testclassification.RegionServerTests;
|
||||
|
@ -139,7 +141,9 @@ public class TestDirectStoreSplitsMerges {
|
|||
setRegionId(region.getRegionInfo().getRegionId() +
|
||||
EnvironmentEdgeManager.currentTime()).build();
|
||||
Path splitDir = regionFS.getSplitsDir(daughterA);
|
||||
Path result = regionFS.commitDaughterRegion(daughterA);
|
||||
MasterProcedureEnv env = TEST_UTIL.getMiniHBaseCluster().getMaster().
|
||||
getMasterProcedureExecutor().getEnvironment();
|
||||
Path result = regionFS.commitDaughterRegion(daughterA, new ArrayList<>(), env);
|
||||
assertEquals(splitDir, result);
|
||||
}
|
||||
|
||||
|
@ -162,14 +166,18 @@ public class TestDirectStoreSplitsMerges {
|
|||
Path splitDirA = regionFS.getSplitsDir(daughterA);
|
||||
Path splitDirB = regionFS.getSplitsDir(daughterB);
|
||||
HStoreFile file = (HStoreFile) region.getStore(FAMILY_NAME).getStorefiles().toArray()[0];
|
||||
regionFS
|
||||
List<Path> filesA = new ArrayList<>();
|
||||
filesA.add(regionFS
|
||||
.splitStoreFile(daughterA, Bytes.toString(FAMILY_NAME), file,
|
||||
Bytes.toBytes("002"), false, region.getSplitPolicy());
|
||||
regionFS
|
||||
Bytes.toBytes("002"), false, region.getSplitPolicy()));
|
||||
List<Path> filesB = new ArrayList<>();
|
||||
filesB.add(regionFS
|
||||
.splitStoreFile(daughterB, Bytes.toString(FAMILY_NAME), file,
|
||||
Bytes.toBytes("002"), true, region.getSplitPolicy());
|
||||
Path resultA = regionFS.commitDaughterRegion(daughterA);
|
||||
Path resultB = regionFS.commitDaughterRegion(daughterB);
|
||||
Bytes.toBytes("002"), true, region.getSplitPolicy()));
|
||||
MasterProcedureEnv env = TEST_UTIL.getMiniHBaseCluster().getMaster().
|
||||
getMasterProcedureExecutor().getEnvironment();
|
||||
Path resultA = regionFS.commitDaughterRegion(daughterA, filesA, env);
|
||||
Path resultB = regionFS.commitDaughterRegion(daughterB, filesB, env);
|
||||
assertEquals(splitDirA, resultA);
|
||||
assertEquals(splitDirB, resultB);
|
||||
}
|
||||
|
@ -203,8 +211,11 @@ public class TestDirectStoreSplitsMerges {
|
|||
mergeFileFromRegion(mergeRegionFs, first, file);
|
||||
//merge file from second region
|
||||
file = (HStoreFile) second.getStore(FAMILY_NAME).getStorefiles().toArray()[0];
|
||||
mergeFileFromRegion(mergeRegionFs, second, file);
|
||||
mergeRegionFs.commitMergedRegion();
|
||||
List<Path> mergedFiles = new ArrayList<>();
|
||||
mergedFiles.add(mergeFileFromRegion(mergeRegionFs, second, file));
|
||||
MasterProcedureEnv env = TEST_UTIL.getMiniHBaseCluster().getMaster().
|
||||
getMasterProcedureExecutor().getEnvironment();
|
||||
mergeRegionFs.commitMergedRegion(mergedFiles, env);
|
||||
}
|
||||
|
||||
private void waitForSplitProcComplete(int attempts, int waitTime) throws Exception {
|
||||
|
@ -223,11 +234,12 @@ public class TestDirectStoreSplitsMerges {
|
|||
}
|
||||
}
|
||||
|
||||
private void mergeFileFromRegion(HRegionFileSystem regionFS, HRegion regionToMerge,
|
||||
private Path mergeFileFromRegion(HRegionFileSystem regionFS, HRegion regionToMerge,
|
||||
HStoreFile file) throws IOException {
|
||||
Path mergedFile = regionFS.mergeStoreFile(regionToMerge.getRegionInfo(),
|
||||
Bytes.toString(FAMILY_NAME), file);
|
||||
validateResultingFile(regionToMerge.getRegionInfo().getEncodedName(), mergedFile);
|
||||
return mergedFile;
|
||||
}
|
||||
|
||||
private void validateResultingFile(String originalRegion, Path result){
|
||||
|
|
|
@ -24,6 +24,7 @@ import static org.junit.Assert.assertNotNull;
|
|||
import static org.junit.Assert.assertNull;
|
||||
import static org.junit.Assert.assertTrue;
|
||||
import static org.junit.Assert.fail;
|
||||
import static org.mockito.ArgumentMatchers.any;
|
||||
import static org.mockito.Mockito.mock;
|
||||
import static org.mockito.Mockito.when;
|
||||
|
||||
|
@ -49,12 +50,14 @@ import org.apache.hadoop.hbase.HConstants;
|
|||
import org.apache.hadoop.hbase.KeyValue;
|
||||
import org.apache.hadoop.hbase.KeyValueUtil;
|
||||
import org.apache.hadoop.hbase.PrivateCellUtil;
|
||||
import org.apache.hadoop.hbase.TableDescriptors;
|
||||
import org.apache.hadoop.hbase.TableName;
|
||||
import org.apache.hadoop.hbase.client.ColumnFamilyDescriptor;
|
||||
import org.apache.hadoop.hbase.client.ColumnFamilyDescriptorBuilder;
|
||||
import org.apache.hadoop.hbase.client.RegionInfo;
|
||||
import org.apache.hadoop.hbase.client.RegionInfoBuilder;
|
||||
import org.apache.hadoop.hbase.client.Scan;
|
||||
import org.apache.hadoop.hbase.client.TableDescriptor;
|
||||
import org.apache.hadoop.hbase.io.FSDataInputStreamWrapper;
|
||||
import org.apache.hadoop.hbase.io.HFileLink;
|
||||
import org.apache.hadoop.hbase.io.encoding.DataBlockEncoding;
|
||||
|
@ -69,6 +72,8 @@ import org.apache.hadoop.hbase.io.hfile.HFileInfo;
|
|||
import org.apache.hadoop.hbase.io.hfile.HFileScanner;
|
||||
import org.apache.hadoop.hbase.io.hfile.ReaderContext;
|
||||
import org.apache.hadoop.hbase.io.hfile.ReaderContextBuilder;
|
||||
import org.apache.hadoop.hbase.master.MasterServices;
|
||||
import org.apache.hadoop.hbase.master.procedure.MasterProcedureEnv;
|
||||
import org.apache.hadoop.hbase.testclassification.MediumTests;
|
||||
import org.apache.hadoop.hbase.testclassification.RegionServerTests;
|
||||
import org.apache.hadoop.hbase.util.BloomFilterFactory;
|
||||
|
@ -1060,7 +1065,19 @@ public class TestHStoreFile {
|
|||
if (null == path) {
|
||||
return null;
|
||||
}
|
||||
Path regionDir = regionFs.commitDaughterRegion(hri);
|
||||
List<Path> splitFiles = new ArrayList<>();
|
||||
splitFiles.add(path);
|
||||
MasterProcedureEnv mockEnv = mock(MasterProcedureEnv.class);
|
||||
MasterServices mockServices = mock(MasterServices.class);
|
||||
when(mockEnv.getMasterServices()).thenReturn(mockServices);
|
||||
when(mockEnv.getMasterConfiguration()).thenReturn(new Configuration());
|
||||
TableDescriptors mockTblDescs = mock(TableDescriptors.class);
|
||||
when(mockServices.getTableDescriptors()).thenReturn(mockTblDescs);
|
||||
TableDescriptor mockTblDesc = mock(TableDescriptor.class);
|
||||
when(mockTblDescs.get(any())).thenReturn(mockTblDesc);
|
||||
ColumnFamilyDescriptor mockCfDesc = mock(ColumnFamilyDescriptor.class);
|
||||
when(mockTblDesc.getColumnFamily(any())).thenReturn(mockCfDesc);
|
||||
Path regionDir = regionFs.commitDaughterRegion(hri, splitFiles, mockEnv);
|
||||
return new Path(new Path(regionDir, family), path.getName());
|
||||
}
|
||||
|
||||
|
|
|
@ -0,0 +1,262 @@
|
|||
/**
|
||||
* Licensed to the Apache Software Foundation (ASF) under one
|
||||
* or more contributor license agreements. See the NOTICE file
|
||||
* distributed with this work for additional information
|
||||
* regarding copyright ownership. The ASF licenses this file
|
||||
* to you under the Apache License, Version 2.0 (the
|
||||
* "License"); you may not use this file except in compliance
|
||||
* with the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
package org.apache.hadoop.hbase.regionserver;
|
||||
|
||||
import static org.apache.hadoop.hbase.regionserver.storefiletracker.StoreFileTrackerFactory.
|
||||
TRACK_IMPL;
|
||||
import static org.junit.Assert.assertTrue;
|
||||
import static org.junit.Assert.fail;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.util.ArrayList;
|
||||
import java.util.HashMap;
|
||||
import java.util.List;
|
||||
import java.util.UUID;
|
||||
import java.util.concurrent.TimeUnit;
|
||||
|
||||
import org.apache.commons.lang3.mutable.MutableBoolean;
|
||||
import org.apache.hadoop.fs.FileStatus;
|
||||
import org.apache.hadoop.fs.FileSystem;
|
||||
import org.apache.hadoop.fs.FileUtil;
|
||||
import org.apache.hadoop.fs.Path;
|
||||
import org.apache.hadoop.hbase.HBaseClassTestRule;
|
||||
import org.apache.hadoop.hbase.HBaseTestingUtil;
|
||||
import org.apache.hadoop.hbase.TableName;
|
||||
import org.apache.hadoop.hbase.client.Put;
|
||||
import org.apache.hadoop.hbase.client.RegionInfo;
|
||||
import org.apache.hadoop.hbase.client.RegionInfoBuilder;
|
||||
import org.apache.hadoop.hbase.client.Table;
|
||||
import org.apache.hadoop.hbase.master.procedure.MasterProcedureEnv;
|
||||
import org.apache.hadoop.hbase.regionserver.storefiletracker.TestStoreFileTracker;
|
||||
import org.apache.hadoop.hbase.testclassification.LargeTests;
|
||||
import org.apache.hadoop.hbase.testclassification.RegionServerTests;
|
||||
import org.apache.hadoop.hbase.util.Bytes;
|
||||
import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
|
||||
import org.apache.hadoop.hbase.util.Pair;
|
||||
import org.junit.AfterClass;
|
||||
import org.junit.Before;
|
||||
import org.junit.BeforeClass;
|
||||
import org.junit.ClassRule;
|
||||
import org.junit.Rule;
|
||||
import org.junit.Test;
|
||||
import org.junit.experimental.categories.Category;
|
||||
import org.junit.rules.TestName;
|
||||
|
||||
|
||||
@Category({RegionServerTests.class, LargeTests.class})
|
||||
public class TestMergesSplitsAddToTracker {
|
||||
|
||||
@ClassRule
|
||||
public static final HBaseClassTestRule CLASS_RULE =
|
||||
HBaseClassTestRule.forClass(TestMergesSplitsAddToTracker.class);
|
||||
|
||||
private static HBaseTestingUtil TEST_UTIL = new HBaseTestingUtil();
|
||||
|
||||
public static final byte[] FAMILY_NAME = Bytes.toBytes("info");
|
||||
|
||||
@Rule
|
||||
public TestName name = new TestName();
|
||||
|
||||
@BeforeClass
|
||||
public static void setupClass() throws Exception {
|
||||
TEST_UTIL.getConfiguration().set(TRACK_IMPL, TestStoreFileTracker.class.getName());
|
||||
TEST_UTIL.startMiniCluster();
|
||||
}
|
||||
|
||||
@AfterClass
|
||||
public static void afterClass() throws Exception {
|
||||
TEST_UTIL.shutdownMiniCluster();
|
||||
}
|
||||
|
||||
@Before
|
||||
public void setup(){
|
||||
TestStoreFileTracker.trackedFiles = new HashMap<>();
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testCommitDaughterRegion() throws Exception {
|
||||
TableName table = TableName.valueOf(name.getMethodName());
|
||||
TEST_UTIL.createTable(table, FAMILY_NAME);
|
||||
//first put some data in order to have a store file created
|
||||
putThreeRowsAndFlush(table);
|
||||
HRegion region = TEST_UTIL.getHBaseCluster().getRegions(table).get(0);
|
||||
HRegionFileSystem regionFS = region.getStores().get(0).getRegionFileSystem();
|
||||
RegionInfo daughterA =
|
||||
RegionInfoBuilder.newBuilder(table).setStartKey(region.getRegionInfo().getStartKey()).
|
||||
setEndKey(Bytes.toBytes("002")).setSplit(false).
|
||||
setRegionId(region.getRegionInfo().getRegionId() +
|
||||
EnvironmentEdgeManager.currentTime()).
|
||||
build();
|
||||
RegionInfo daughterB = RegionInfoBuilder.newBuilder(table).setStartKey(Bytes.toBytes("002"))
|
||||
.setEndKey(region.getRegionInfo().getEndKey()).setSplit(false)
|
||||
.setRegionId(region.getRegionInfo().getRegionId()).build();
|
||||
HStoreFile file = (HStoreFile) region.getStore(FAMILY_NAME).getStorefiles().toArray()[0];
|
||||
List<Path> splitFilesA = new ArrayList<>();
|
||||
splitFilesA.add(regionFS
|
||||
.splitStoreFile(daughterA, Bytes.toString(FAMILY_NAME), file,
|
||||
Bytes.toBytes("002"), false, region.getSplitPolicy()));
|
||||
List<Path> splitFilesB = new ArrayList<>();
|
||||
splitFilesB.add(regionFS
|
||||
.splitStoreFile(daughterB, Bytes.toString(FAMILY_NAME), file,
|
||||
Bytes.toBytes("002"), true, region.getSplitPolicy()));
|
||||
MasterProcedureEnv env = TEST_UTIL.getMiniHBaseCluster().getMaster().
|
||||
getMasterProcedureExecutor().getEnvironment();
|
||||
Path resultA = regionFS.commitDaughterRegion(daughterA, splitFilesA, env);
|
||||
Path resultB = regionFS.commitDaughterRegion(daughterB, splitFilesB, env);
|
||||
FileSystem fs = regionFS.getFileSystem();
|
||||
verifyFilesAreTracked(resultA, fs);
|
||||
verifyFilesAreTracked(resultB, fs);
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testCommitMergedRegion() throws Exception {
|
||||
TableName table = TableName.valueOf(name.getMethodName());
|
||||
TEST_UTIL.createTable(table, FAMILY_NAME);
|
||||
//splitting the table first
|
||||
TEST_UTIL.getAdmin().split(table, Bytes.toBytes("002"));
|
||||
//Add data and flush to create files in the two different regions
|
||||
putThreeRowsAndFlush(table);
|
||||
List<HRegion> regions = TEST_UTIL.getHBaseCluster().getRegions(table);
|
||||
HRegion first = regions.get(0);
|
||||
HRegion second = regions.get(1);
|
||||
HRegionFileSystem regionFS = first.getRegionFileSystem();
|
||||
|
||||
RegionInfo mergeResult =
|
||||
RegionInfoBuilder.newBuilder(table).setStartKey(first.getRegionInfo().getStartKey())
|
||||
.setEndKey(second.getRegionInfo().getEndKey()).setSplit(false)
|
||||
.setRegionId(first.getRegionInfo().getRegionId() +
|
||||
EnvironmentEdgeManager.currentTime()).build();
|
||||
|
||||
HRegionFileSystem mergeFS = HRegionFileSystem.createRegionOnFileSystem(
|
||||
TEST_UTIL.getHBaseCluster().getMaster().getConfiguration(),
|
||||
regionFS.getFileSystem(), regionFS.getTableDir(), mergeResult);
|
||||
|
||||
List<Path> mergedFiles = new ArrayList<>();
|
||||
//merge file from first region
|
||||
mergedFiles.add(mergeFileFromRegion(first, mergeFS));
|
||||
//merge file from second region
|
||||
mergedFiles.add(mergeFileFromRegion(second, mergeFS));
|
||||
MasterProcedureEnv env = TEST_UTIL.getMiniHBaseCluster().getMaster().
|
||||
getMasterProcedureExecutor().getEnvironment();
|
||||
mergeFS.commitMergedRegion(mergedFiles, env);
|
||||
//validate
|
||||
FileSystem fs = first.getRegionFileSystem().getFileSystem();
|
||||
Path finalMergeDir = new Path(first.getRegionFileSystem().getTableDir(),
|
||||
mergeResult.getEncodedName());
|
||||
verifyFilesAreTracked(finalMergeDir, fs);
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testSplitLoadsFromTracker() throws Exception {
|
||||
TableName table = TableName.valueOf(name.getMethodName());
|
||||
TEST_UTIL.createTable(table, FAMILY_NAME);
|
||||
//Add data and flush to create files in the two different regions
|
||||
putThreeRowsAndFlush(table);
|
||||
HRegion region = TEST_UTIL.getHBaseCluster().getRegions(table).get(0);
|
||||
Pair<StoreFileInfo, String> copyResult = copyFileInTheStoreDir(region);
|
||||
StoreFileInfo fileInfo = copyResult.getFirst();
|
||||
String copyName = copyResult.getSecond();
|
||||
//Now splits the region
|
||||
TEST_UTIL.getAdmin().split(table, Bytes.toBytes("002"));
|
||||
List<HRegion> regions = TEST_UTIL.getHBaseCluster().getRegions(table);
|
||||
HRegion first = regions.get(0);
|
||||
validateDaughterRegionsFiles(first, fileInfo.getActiveFileName(), copyName);
|
||||
HRegion second = regions.get(1);
|
||||
validateDaughterRegionsFiles(second, fileInfo.getActiveFileName(), copyName);
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testMergeLoadsFromTracker() throws Exception {
|
||||
TableName table = TableName.valueOf(name.getMethodName());
|
||||
TEST_UTIL.createTable(table, new byte[][]{FAMILY_NAME},
|
||||
new byte[][]{Bytes.toBytes("002")});
|
||||
//Add data and flush to create files in the two different regions
|
||||
putThreeRowsAndFlush(table);
|
||||
List<HRegion> regions = TEST_UTIL.getHBaseCluster().getRegions(table);
|
||||
HRegion first = regions.get(0);
|
||||
Pair<StoreFileInfo, String> copyResult = copyFileInTheStoreDir(first);
|
||||
StoreFileInfo fileInfo = copyResult.getFirst();
|
||||
String copyName = copyResult.getSecond();
|
||||
//Now merges the first two regions
|
||||
TEST_UTIL.getAdmin().mergeRegionsAsync(new byte[][]{
|
||||
first.getRegionInfo().getEncodedNameAsBytes(),
|
||||
regions.get(1).getRegionInfo().getEncodedNameAsBytes()
|
||||
}, true).get(10, TimeUnit.SECONDS);
|
||||
regions = TEST_UTIL.getHBaseCluster().getRegions(table);
|
||||
HRegion merged = regions.get(0);
|
||||
validateDaughterRegionsFiles(merged, fileInfo.getActiveFileName(), copyName);
|
||||
}
|
||||
|
||||
private Pair<StoreFileInfo,String> copyFileInTheStoreDir(HRegion region) throws IOException {
|
||||
Path storeDir = region.getRegionFileSystem().getStoreDir("info");
|
||||
//gets the single file
|
||||
StoreFileInfo fileInfo = region.getRegionFileSystem().getStoreFiles("info").get(0);
|
||||
//make a copy of the valid file staight into the store dir, so that it's not tracked.
|
||||
String copyName = UUID.randomUUID().toString().replaceAll("-", "");
|
||||
Path copy = new Path(storeDir, copyName);
|
||||
FileUtil.copy(region.getFilesystem(), fileInfo.getFileStatus(), region.getFilesystem(),
|
||||
copy , false, false, TEST_UTIL.getConfiguration());
|
||||
return new Pair<>(fileInfo, copyName);
|
||||
}
|
||||
|
||||
private void validateDaughterRegionsFiles(HRegion region, String orignalFileName,
|
||||
String untrackedFile) throws IOException {
|
||||
//verify there's no link for the untracked, copied file in first region
|
||||
List<StoreFileInfo> infos = region.getRegionFileSystem().getStoreFiles("info");
|
||||
final MutableBoolean foundLink = new MutableBoolean(false);
|
||||
infos.stream().forEach(i -> {
|
||||
i.getActiveFileName().contains(orignalFileName);
|
||||
if(i.getActiveFileName().contains(untrackedFile)){
|
||||
fail();
|
||||
}
|
||||
if(i.getActiveFileName().contains(orignalFileName)){
|
||||
foundLink.setTrue();
|
||||
}
|
||||
});
|
||||
assertTrue(foundLink.booleanValue());
|
||||
}
|
||||
|
||||
private void verifyFilesAreTracked(Path regionDir, FileSystem fs) throws Exception {
|
||||
String storeId = regionDir.getName() + "-info";
|
||||
for(FileStatus f : fs.listStatus(new Path(regionDir, Bytes.toString(FAMILY_NAME)))){
|
||||
assertTrue(TestStoreFileTracker.trackedFiles.get(storeId).stream().filter(s ->
|
||||
s.getPath().equals(f.getPath())).findFirst().isPresent());
|
||||
}
|
||||
}
|
||||
|
||||
private Path mergeFileFromRegion(HRegion regionToMerge, HRegionFileSystem mergeFS)
|
||||
throws IOException {
|
||||
HStoreFile file = (HStoreFile) regionToMerge.getStore(FAMILY_NAME).getStorefiles().toArray()[0];
|
||||
return mergeFS.mergeStoreFile(regionToMerge.getRegionInfo(), Bytes.toString(FAMILY_NAME), file);
|
||||
}
|
||||
|
||||
private void putThreeRowsAndFlush(TableName table) throws IOException {
|
||||
Table tbl = TEST_UTIL.getConnection().getTable(table);
|
||||
Put put = new Put(Bytes.toBytes("001"));
|
||||
byte[] qualifier = Bytes.toBytes("1");
|
||||
put.addColumn(FAMILY_NAME, qualifier, Bytes.toBytes(1));
|
||||
tbl.put(put);
|
||||
put = new Put(Bytes.toBytes("002"));
|
||||
put.addColumn(FAMILY_NAME, qualifier, Bytes.toBytes(2));
|
||||
tbl.put(put);
|
||||
put = new Put(Bytes.toBytes("003"));
|
||||
put.addColumn(FAMILY_NAME, qualifier, Bytes.toBytes(2));
|
||||
tbl.put(put);
|
||||
TEST_UTIL.flush(table);
|
||||
}
|
||||
}
|
|
@ -120,6 +120,7 @@ public class TestStripeStoreEngine {
|
|||
private static TestStoreEngine createEngine(Configuration conf) throws Exception {
|
||||
HRegion region = mock(HRegion.class);
|
||||
HStore store = mock(HStore.class);
|
||||
store.conf = conf;
|
||||
when(store.getRegionInfo()).thenReturn(RegionInfoBuilder.FIRST_META_REGIONINFO);
|
||||
when(store.getHRegion()).thenReturn(region);
|
||||
CellComparatorImpl kvComparator = mock(CellComparatorImpl.class);
|
||||
|
|
|
@ -0,0 +1,56 @@
|
|||
/**
|
||||
* Licensed to the Apache Software Foundation (ASF) under one
|
||||
* or more contributor license agreements. See the NOTICE file
|
||||
* distributed with this work for additional information
|
||||
* regarding copyright ownership. The ASF licenses this file
|
||||
* to you under the Apache License, Version 2.0 (the
|
||||
* "License"); you may not use this file except in compliance
|
||||
* with the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
package org.apache.hadoop.hbase.regionserver.storefiletracker;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.util.ArrayList;
|
||||
import java.util.Collection;
|
||||
import java.util.HashMap;
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
|
||||
import org.apache.hadoop.conf.Configuration;
|
||||
import org.apache.hadoop.hbase.regionserver.StoreContext;
|
||||
import org.apache.hadoop.hbase.regionserver.StoreFileInfo;
|
||||
import org.slf4j.Logger;
|
||||
import org.slf4j.LoggerFactory;
|
||||
|
||||
public class TestStoreFileTracker extends DefaultStoreFileTracker {
|
||||
|
||||
private static final Logger LOG = LoggerFactory.getLogger(TestStoreFileTracker.class);
|
||||
public static Map<String, List<StoreFileInfo>> trackedFiles = new HashMap<>();
|
||||
private String storeId;
|
||||
|
||||
public TestStoreFileTracker(Configuration conf, boolean isPrimaryReplica, StoreContext ctx) {
|
||||
super(conf, isPrimaryReplica, ctx);
|
||||
this.storeId = ctx.getRegionInfo().getEncodedName() + "-" + ctx.getFamily().getNameAsString();
|
||||
LOG.info("created storeId: {}", storeId);
|
||||
trackedFiles.computeIfAbsent(storeId, v -> new ArrayList<>());
|
||||
}
|
||||
|
||||
@Override
|
||||
protected void doAddNewStoreFiles(Collection<StoreFileInfo> newFiles) throws IOException {
|
||||
LOG.info("adding to storeId: {}", storeId);
|
||||
trackedFiles.get(storeId).addAll(newFiles);
|
||||
}
|
||||
|
||||
@Override
|
||||
public List<StoreFileInfo> load() throws IOException {
|
||||
return trackedFiles.get(storeId);
|
||||
}
|
||||
}
|
Loading…
Reference in New Issue