HBASE-26224 Introduce a MigrationStoreFileTracker to support migrating from different store file tracker implementations (#3656)
Signed-off-by: Wellington Chevreuil <wchevreuil@apache.org>
This commit is contained in:
parent
6e053765e8
commit
090b2fecf4
|
@ -19,6 +19,7 @@ package org.apache.hadoop.hbase.regionserver.storefiletracker;
|
||||||
|
|
||||||
import java.io.IOException;
|
import java.io.IOException;
|
||||||
import java.util.Collection;
|
import java.util.Collection;
|
||||||
|
import java.util.Collections;
|
||||||
import java.util.List;
|
import java.util.List;
|
||||||
import org.apache.hadoop.conf.Configuration;
|
import org.apache.hadoop.conf.Configuration;
|
||||||
|
|
||||||
|
@ -39,7 +40,9 @@ class DefaultStoreFileTracker extends StoreFileTrackerBase {
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public List<StoreFileInfo> load() throws IOException {
|
public List<StoreFileInfo> load() throws IOException {
|
||||||
return ctx.getRegionFileSystem().getStoreFiles(ctx.getFamily().getNameAsString());
|
List<StoreFileInfo> files =
|
||||||
|
ctx.getRegionFileSystem().getStoreFiles(ctx.getFamily().getNameAsString());
|
||||||
|
return files != null ? files : Collections.emptyList();
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
|
@ -57,4 +60,9 @@ class DefaultStoreFileTracker extends StoreFileTrackerBase {
|
||||||
Collection<StoreFileInfo> newFiles) throws IOException {
|
Collection<StoreFileInfo> newFiles) throws IOException {
|
||||||
// NOOP
|
// NOOP
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
void set(List<StoreFileInfo> files) {
|
||||||
|
// NOOP
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -48,7 +48,7 @@ import org.apache.hadoop.hbase.shaded.protobuf.generated.StoreFileTrackerProtos.
|
||||||
* storages.
|
* storages.
|
||||||
*/
|
*/
|
||||||
@InterfaceAudience.Private
|
@InterfaceAudience.Private
|
||||||
public class FileBasedStoreFileTracker extends StoreFileTrackerBase {
|
class FileBasedStoreFileTracker extends StoreFileTrackerBase {
|
||||||
|
|
||||||
private final StoreFileListFile backedFile;
|
private final StoreFileListFile backedFile;
|
||||||
|
|
||||||
|
@ -139,4 +139,17 @@ public class FileBasedStoreFileTracker extends StoreFileTrackerBase {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
void set(List<StoreFileInfo> files) throws IOException {
|
||||||
|
synchronized (storefiles) {
|
||||||
|
storefiles.clear();
|
||||||
|
StoreFileList.Builder builder = StoreFileList.newBuilder();
|
||||||
|
for (StoreFileInfo info : files) {
|
||||||
|
storefiles.put(info.getPath().getName(), info);
|
||||||
|
builder.addStoreFile(toStoreFileEntry(info));
|
||||||
|
}
|
||||||
|
backedFile.update(builder);
|
||||||
|
}
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -0,0 +1,88 @@
|
||||||
|
/**
|
||||||
|
* Licensed to the Apache Software Foundation (ASF) under one
|
||||||
|
* or more contributor license agreements. See the NOTICE file
|
||||||
|
* distributed with this work for additional information
|
||||||
|
* regarding copyright ownership. The ASF licenses this file
|
||||||
|
* to you under the Apache License, Version 2.0 (the
|
||||||
|
* "License"); you may not use this file except in compliance
|
||||||
|
* with the License. You may obtain a copy of the License at
|
||||||
|
*
|
||||||
|
* http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
*
|
||||||
|
* Unless required by applicable law or agreed to in writing, software
|
||||||
|
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||||
|
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||||
|
* See the License for the specific language governing permissions and
|
||||||
|
* limitations under the License.
|
||||||
|
*/
|
||||||
|
package org.apache.hadoop.hbase.regionserver.storefiletracker;
|
||||||
|
|
||||||
|
import java.io.IOException;
|
||||||
|
import java.util.Collection;
|
||||||
|
import java.util.List;
|
||||||
|
import org.apache.hadoop.conf.Configuration;
|
||||||
|
import org.apache.hadoop.hbase.regionserver.StoreContext;
|
||||||
|
import org.apache.hadoop.hbase.regionserver.StoreFileInfo;
|
||||||
|
import org.apache.yetus.audience.InterfaceAudience;
|
||||||
|
|
||||||
|
import org.apache.hbase.thirdparty.com.google.common.base.Preconditions;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* A store file tracker used for migrating between store file tracker implementations.
|
||||||
|
*/
|
||||||
|
@InterfaceAudience.Private
|
||||||
|
class MigrationStoreFileTracker extends StoreFileTrackerBase {
|
||||||
|
|
||||||
|
public static final String SRC_IMPL = "hbase.store.file-tracker.migration.src.impl";
|
||||||
|
|
||||||
|
public static final String DST_IMPL = "hbase.store.file-tracker.migration.dst.impl";
|
||||||
|
|
||||||
|
private final StoreFileTrackerBase src;
|
||||||
|
|
||||||
|
private final StoreFileTrackerBase dst;
|
||||||
|
|
||||||
|
public MigrationStoreFileTracker(Configuration conf, boolean isPrimaryReplica, StoreContext ctx) {
|
||||||
|
super(conf, isPrimaryReplica, ctx);
|
||||||
|
this.src = StoreFileTrackerFactory.create(conf, SRC_IMPL, isPrimaryReplica, ctx);
|
||||||
|
this.dst = StoreFileTrackerFactory.create(conf, DST_IMPL, isPrimaryReplica, ctx);
|
||||||
|
Preconditions.checkArgument(!src.getClass().equals(dst.getClass()),
|
||||||
|
"src and dst is the same: %s", src.getClass());
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public List<StoreFileInfo> load() throws IOException {
|
||||||
|
List<StoreFileInfo> files = src.load();
|
||||||
|
dst.set(files);
|
||||||
|
return files;
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
protected boolean requireWritingToTmpDirFirst() {
|
||||||
|
// Returns true if either of the two StoreFileTracker returns true.
|
||||||
|
// For example, if we want to migrate from a tracker implementation which can ignore the broken
|
||||||
|
// files under data directory to a tracker implementation which can not, if we still allow
|
||||||
|
// writing in tmp directory directly, we may have some broken files under the data directory and
|
||||||
|
// then after we finally change the implementation which can not ignore the broken files, we
|
||||||
|
// will be in trouble.
|
||||||
|
return src.requireWritingToTmpDirFirst() || dst.requireWritingToTmpDirFirst();
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
protected void doAddNewStoreFiles(Collection<StoreFileInfo> newFiles) throws IOException {
|
||||||
|
src.doAddNewStoreFiles(newFiles);
|
||||||
|
dst.doAddNewStoreFiles(newFiles);
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
protected void doAddCompactionResults(Collection<StoreFileInfo> compactedFiles,
|
||||||
|
Collection<StoreFileInfo> newFiles) throws IOException {
|
||||||
|
src.doAddCompactionResults(compactedFiles, newFiles);
|
||||||
|
dst.doAddCompactionResults(compactedFiles, newFiles);
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
void set(List<StoreFileInfo> files) {
|
||||||
|
throw new UnsupportedOperationException(
|
||||||
|
"Should not call this method on " + getClass().getSimpleName());
|
||||||
|
}
|
||||||
|
}
|
|
@ -29,7 +29,6 @@ import org.apache.yetus.audience.InterfaceAudience;
|
||||||
import org.slf4j.Logger;
|
import org.slf4j.Logger;
|
||||||
import org.slf4j.LoggerFactory;
|
import org.slf4j.LoggerFactory;
|
||||||
|
|
||||||
import org.apache.hbase.thirdparty.com.google.common.base.Preconditions;
|
|
||||||
import org.apache.hbase.thirdparty.com.google.common.io.ByteStreams;
|
import org.apache.hbase.thirdparty.com.google.common.io.ByteStreams;
|
||||||
import org.apache.hbase.thirdparty.com.google.protobuf.InvalidProtocolBufferException;
|
import org.apache.hbase.thirdparty.com.google.protobuf.InvalidProtocolBufferException;
|
||||||
|
|
||||||
|
@ -121,7 +120,10 @@ class StoreFileListFile {
|
||||||
* We will set the timestamp in this method so just pass the builder in
|
* We will set the timestamp in this method so just pass the builder in
|
||||||
*/
|
*/
|
||||||
void update(StoreFileList.Builder builder) throws IOException {
|
void update(StoreFileList.Builder builder) throws IOException {
|
||||||
Preconditions.checkState(nextTrackFile >= 0, "should call load first before calling update");
|
if (nextTrackFile < 0) {
|
||||||
|
// we need to call load first to load the prevTimestamp and also the next file
|
||||||
|
load();
|
||||||
|
}
|
||||||
FileSystem fs = ctx.getRegionFileSystem().getFileSystem();
|
FileSystem fs = ctx.getRegionFileSystem().getFileSystem();
|
||||||
long timestamp = Math.max(prevTimestamp + 1, EnvironmentEdgeManager.currentTime());
|
long timestamp = Math.max(prevTimestamp + 1, EnvironmentEdgeManager.currentTime());
|
||||||
try (FSDataOutputStream out = fs.create(trackFiles[nextTrackFile], true)) {
|
try (FSDataOutputStream out = fs.create(trackFiles[nextTrackFile], true)) {
|
||||||
|
|
|
@ -19,6 +19,7 @@ package org.apache.hadoop.hbase.regionserver.storefiletracker;
|
||||||
|
|
||||||
import java.io.IOException;
|
import java.io.IOException;
|
||||||
import java.util.Collection;
|
import java.util.Collection;
|
||||||
|
import java.util.List;
|
||||||
import org.apache.hadoop.conf.Configuration;
|
import org.apache.hadoop.conf.Configuration;
|
||||||
import org.apache.hadoop.fs.Path;
|
import org.apache.hadoop.fs.Path;
|
||||||
import org.apache.hadoop.hbase.client.ColumnFamilyDescriptor;
|
import org.apache.hadoop.hbase.client.ColumnFamilyDescriptor;
|
||||||
|
@ -95,8 +96,7 @@ abstract class StoreFileTrackerBase implements StoreFileTracker {
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public final StoreFileWriter createWriter(CreateStoreFileWriterParams params)
|
public final StoreFileWriter createWriter(CreateStoreFileWriterParams params) throws IOException {
|
||||||
throws IOException {
|
|
||||||
if (!isPrimaryReplica) {
|
if (!isPrimaryReplica) {
|
||||||
throw new IllegalStateException("Should not call create writer on secondary replicas");
|
throw new IllegalStateException("Should not call create writer on secondary replicas");
|
||||||
}
|
}
|
||||||
|
@ -170,4 +170,12 @@ abstract class StoreFileTrackerBase implements StoreFileTracker {
|
||||||
|
|
||||||
protected abstract void doAddCompactionResults(Collection<StoreFileInfo> compactedFiles,
|
protected abstract void doAddCompactionResults(Collection<StoreFileInfo> compactedFiles,
|
||||||
Collection<StoreFileInfo> newFiles) throws IOException;
|
Collection<StoreFileInfo> newFiles) throws IOException;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* used to mirror the store file list after loading when migration.
|
||||||
|
* <p/>
|
||||||
|
* Do not add this method to the {@link StoreFileTracker} interface since we do not need this
|
||||||
|
* method in upper layer.
|
||||||
|
*/
|
||||||
|
abstract void set(List<StoreFileInfo> files) throws IOException;
|
||||||
}
|
}
|
||||||
|
|
|
@ -18,18 +18,20 @@
|
||||||
package org.apache.hadoop.hbase.regionserver.storefiletracker;
|
package org.apache.hadoop.hbase.regionserver.storefiletracker;
|
||||||
|
|
||||||
import org.apache.hadoop.conf.Configuration;
|
import org.apache.hadoop.conf.Configuration;
|
||||||
import org.apache.hadoop.hbase.CompoundConfiguration;
|
|
||||||
import org.apache.hadoop.hbase.client.ColumnFamilyDescriptor;
|
import org.apache.hadoop.hbase.client.ColumnFamilyDescriptor;
|
||||||
import org.apache.hadoop.hbase.client.ColumnFamilyDescriptorBuilder;
|
import org.apache.hadoop.hbase.client.ColumnFamilyDescriptorBuilder;
|
||||||
import org.apache.hadoop.hbase.client.TableDescriptor;
|
import org.apache.hadoop.hbase.client.TableDescriptor;
|
||||||
import org.apache.hadoop.hbase.regionserver.HRegionFileSystem;
|
import org.apache.hadoop.hbase.regionserver.HRegionFileSystem;
|
||||||
import org.apache.hadoop.hbase.regionserver.StoreContext;
|
import org.apache.hadoop.hbase.regionserver.StoreContext;
|
||||||
|
import org.apache.hadoop.hbase.regionserver.StoreUtils;
|
||||||
import org.apache.hadoop.hbase.util.Bytes;
|
import org.apache.hadoop.hbase.util.Bytes;
|
||||||
import org.apache.hadoop.hbase.util.ReflectionUtils;
|
import org.apache.hadoop.hbase.util.ReflectionUtils;
|
||||||
import org.apache.yetus.audience.InterfaceAudience;
|
import org.apache.yetus.audience.InterfaceAudience;
|
||||||
import org.slf4j.Logger;
|
import org.slf4j.Logger;
|
||||||
import org.slf4j.LoggerFactory;
|
import org.slf4j.LoggerFactory;
|
||||||
|
|
||||||
|
import org.apache.hbase.thirdparty.com.google.common.base.Preconditions;
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Factory method for creating store file tracker.
|
* Factory method for creating store file tracker.
|
||||||
*/
|
*/
|
||||||
|
@ -50,19 +52,27 @@ public final class StoreFileTrackerFactory {
|
||||||
HRegionFileSystem regionFs) {
|
HRegionFileSystem regionFs) {
|
||||||
ColumnFamilyDescriptorBuilder fDescBuilder =
|
ColumnFamilyDescriptorBuilder fDescBuilder =
|
||||||
ColumnFamilyDescriptorBuilder.newBuilder(Bytes.toBytes(family));
|
ColumnFamilyDescriptorBuilder.newBuilder(Bytes.toBytes(family));
|
||||||
StoreContext ctx = StoreContext.getBuilder().
|
StoreContext ctx = StoreContext.getBuilder().withColumnFamilyDescriptor(fDescBuilder.build())
|
||||||
withColumnFamilyDescriptor(fDescBuilder.build()).
|
.withRegionFileSystem(regionFs).build();
|
||||||
withRegionFileSystem(regionFs).
|
return StoreFileTrackerFactory.create(conf, TRACK_IMPL, isPrimaryReplica, ctx);
|
||||||
build();
|
|
||||||
return StoreFileTrackerFactory.create(conf, isPrimaryReplica, ctx);
|
|
||||||
}
|
}
|
||||||
|
|
||||||
public static Configuration mergeConfigurations(Configuration global,
|
public static Configuration mergeConfigurations(Configuration global, TableDescriptor table,
|
||||||
TableDescriptor table, ColumnFamilyDescriptor family) {
|
ColumnFamilyDescriptor family) {
|
||||||
return new CompoundConfiguration()
|
return StoreUtils.createStoreConfiguration(global, table, family);
|
||||||
.add(global)
|
}
|
||||||
.addBytesMap(table.getValues())
|
|
||||||
.addStringMap(family.getConfiguration())
|
static StoreFileTrackerBase create(Configuration conf, String configName,
|
||||||
.addBytesMap(family.getValues());
|
boolean isPrimaryReplica, StoreContext ctx) {
|
||||||
|
String className =
|
||||||
|
Preconditions.checkNotNull(conf.get(configName), "config %s is not set", configName);
|
||||||
|
Class<? extends StoreFileTrackerBase> tracker;
|
||||||
|
try {
|
||||||
|
tracker = Class.forName(className).asSubclass(StoreFileTrackerBase.class);
|
||||||
|
} catch (ClassNotFoundException e) {
|
||||||
|
throw new RuntimeException(e);
|
||||||
|
}
|
||||||
|
LOG.info("instantiating StoreFileTracker impl {} as {}", tracker.getName(), configName);
|
||||||
|
return ReflectionUtils.newInstance(tracker, conf, isPrimaryReplica, ctx);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -0,0 +1,193 @@
|
||||||
|
/**
|
||||||
|
* Licensed to the Apache Software Foundation (ASF) under one
|
||||||
|
* or more contributor license agreements. See the NOTICE file
|
||||||
|
* distributed with this work for additional information
|
||||||
|
* regarding copyright ownership. The ASF licenses this file
|
||||||
|
* to you under the Apache License, Version 2.0 (the
|
||||||
|
* "License"); you may not use this file except in compliance
|
||||||
|
* with the License. You may obtain a copy of the License at
|
||||||
|
*
|
||||||
|
* http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
*
|
||||||
|
* Unless required by applicable law or agreed to in writing, software
|
||||||
|
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||||
|
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||||
|
* See the License for the specific language governing permissions and
|
||||||
|
* limitations under the License.
|
||||||
|
*/
|
||||||
|
package org.apache.hadoop.hbase.regionserver.storefiletracker;
|
||||||
|
|
||||||
|
import static org.hamcrest.CoreMatchers.hasItems;
|
||||||
|
import static org.hamcrest.MatcherAssert.assertThat;
|
||||||
|
import static org.junit.Assert.assertEquals;
|
||||||
|
|
||||||
|
import java.io.IOException;
|
||||||
|
import java.util.ArrayList;
|
||||||
|
import java.util.Arrays;
|
||||||
|
import java.util.List;
|
||||||
|
import java.util.stream.Collectors;
|
||||||
|
import org.apache.hadoop.conf.Configuration;
|
||||||
|
import org.apache.hadoop.fs.Path;
|
||||||
|
import org.apache.hadoop.hbase.HBaseClassTestRule;
|
||||||
|
import org.apache.hadoop.hbase.HBaseTestingUtil;
|
||||||
|
import org.apache.hadoop.hbase.TableName;
|
||||||
|
import org.apache.hadoop.hbase.client.ColumnFamilyDescriptorBuilder;
|
||||||
|
import org.apache.hadoop.hbase.client.Get;
|
||||||
|
import org.apache.hadoop.hbase.client.Put;
|
||||||
|
import org.apache.hadoop.hbase.client.RegionInfo;
|
||||||
|
import org.apache.hadoop.hbase.client.RegionInfoBuilder;
|
||||||
|
import org.apache.hadoop.hbase.client.Result;
|
||||||
|
import org.apache.hadoop.hbase.client.TableDescriptor;
|
||||||
|
import org.apache.hadoop.hbase.client.TableDescriptorBuilder;
|
||||||
|
import org.apache.hadoop.hbase.regionserver.ChunkCreator;
|
||||||
|
import org.apache.hadoop.hbase.regionserver.HRegion;
|
||||||
|
import org.apache.hadoop.hbase.regionserver.MemStoreLAB;
|
||||||
|
import org.apache.hadoop.hbase.testclassification.MediumTests;
|
||||||
|
import org.apache.hadoop.hbase.testclassification.RegionServerTests;
|
||||||
|
import org.apache.hadoop.hbase.util.Bytes;
|
||||||
|
import org.apache.hadoop.hbase.wal.WAL;
|
||||||
|
import org.junit.After;
|
||||||
|
import org.junit.Before;
|
||||||
|
import org.junit.BeforeClass;
|
||||||
|
import org.junit.ClassRule;
|
||||||
|
import org.junit.Rule;
|
||||||
|
import org.junit.Test;
|
||||||
|
import org.junit.experimental.categories.Category;
|
||||||
|
import org.junit.rules.TestName;
|
||||||
|
import org.junit.runner.RunWith;
|
||||||
|
import org.junit.runners.Parameterized;
|
||||||
|
import org.junit.runners.Parameterized.Parameter;
|
||||||
|
import org.junit.runners.Parameterized.Parameters;
|
||||||
|
|
||||||
|
import org.apache.hbase.thirdparty.com.google.common.collect.Iterables;
|
||||||
|
import org.apache.hbase.thirdparty.com.google.common.io.Closeables;
|
||||||
|
|
||||||
|
@RunWith(Parameterized.class)
|
||||||
|
@Category({ RegionServerTests.class, MediumTests.class })
|
||||||
|
public class TestMigrationStoreFileTracker {
|
||||||
|
|
||||||
|
@ClassRule
|
||||||
|
public static final HBaseClassTestRule CLASS_RULE =
|
||||||
|
HBaseClassTestRule.forClass(TestMigrationStoreFileTracker.class);
|
||||||
|
|
||||||
|
private static final HBaseTestingUtil UTIL = new HBaseTestingUtil();
|
||||||
|
|
||||||
|
private static final byte[] CF = Bytes.toBytes("cf");
|
||||||
|
|
||||||
|
private static final byte[] CQ = Bytes.toBytes("cq");
|
||||||
|
|
||||||
|
private static final TableDescriptor TD =
|
||||||
|
TableDescriptorBuilder.newBuilder(TableName.valueOf("file_based_tracker"))
|
||||||
|
.setColumnFamily(ColumnFamilyDescriptorBuilder.of(CF)).build();
|
||||||
|
|
||||||
|
private static final RegionInfo RI = RegionInfoBuilder.newBuilder(TD.getTableName()).build();
|
||||||
|
|
||||||
|
@Rule
|
||||||
|
public TestName name = new TestName();
|
||||||
|
|
||||||
|
@Parameter(0)
|
||||||
|
public Class<? extends StoreFileTrackerBase> srcImplClass;
|
||||||
|
|
||||||
|
@Parameter(1)
|
||||||
|
public Class<? extends StoreFileTrackerBase> dstImplClass;
|
||||||
|
|
||||||
|
private HRegion region;
|
||||||
|
|
||||||
|
private Path rootDir;
|
||||||
|
|
||||||
|
private WAL wal;
|
||||||
|
|
||||||
|
@Parameters(name = "{index}: src={0}, dst={1}")
|
||||||
|
public static List<Object[]> params() {
|
||||||
|
List<Class<? extends StoreFileTrackerBase>> impls =
|
||||||
|
Arrays.asList(DefaultStoreFileTracker.class, FileBasedStoreFileTracker.class);
|
||||||
|
List<Object[]> params = new ArrayList<>();
|
||||||
|
for (Class<? extends StoreFileTrackerBase> src : impls) {
|
||||||
|
for (Class<? extends StoreFileTrackerBase> dst : impls) {
|
||||||
|
if (src.equals(dst)) {
|
||||||
|
continue;
|
||||||
|
}
|
||||||
|
params.add(new Object[] { src, dst });
|
||||||
|
}
|
||||||
|
}
|
||||||
|
return params;
|
||||||
|
}
|
||||||
|
|
||||||
|
@BeforeClass
|
||||||
|
public static void setUpBeforeClass() {
|
||||||
|
ChunkCreator.initialize(MemStoreLAB.CHUNK_SIZE_DEFAULT, false, 0, 0, 0, null,
|
||||||
|
MemStoreLAB.INDEX_CHUNK_SIZE_PERCENTAGE_DEFAULT);
|
||||||
|
}
|
||||||
|
|
||||||
|
@Before
|
||||||
|
public void setUp() throws IOException {
|
||||||
|
Configuration conf = UTIL.getConfiguration();
|
||||||
|
conf.setClass(MigrationStoreFileTracker.SRC_IMPL, srcImplClass, StoreFileTrackerBase.class);
|
||||||
|
conf.setClass(MigrationStoreFileTracker.DST_IMPL, dstImplClass, StoreFileTrackerBase.class);
|
||||||
|
rootDir = UTIL.getDataTestDir(name.getMethodName().replaceAll("[=:\\[ ]", "_"));
|
||||||
|
wal = HBaseTestingUtil.createWal(conf, rootDir, RI);
|
||||||
|
}
|
||||||
|
|
||||||
|
@After
|
||||||
|
public void tearDown() throws IOException {
|
||||||
|
if (region != null) {
|
||||||
|
region.close();
|
||||||
|
}
|
||||||
|
Closeables.close(wal, true);
|
||||||
|
UTIL.cleanupTestDir();
|
||||||
|
}
|
||||||
|
|
||||||
|
private List<String> getStoreFiles() {
|
||||||
|
return Iterables.getOnlyElement(region.getStores()).getStorefiles().stream()
|
||||||
|
.map(s -> s.getFileInfo().getPath().getName()).collect(Collectors.toList());
|
||||||
|
}
|
||||||
|
|
||||||
|
private HRegion createRegion(Class<? extends StoreFileTrackerBase> trackerImplClass)
|
||||||
|
throws IOException {
|
||||||
|
Configuration conf = new Configuration(UTIL.getConfiguration());
|
||||||
|
conf.setClass(StoreFileTrackerFactory.TRACK_IMPL, trackerImplClass, StoreFileTracker.class);
|
||||||
|
return HRegion.createHRegion(RI, rootDir, conf, TD, wal, true);
|
||||||
|
}
|
||||||
|
|
||||||
|
private void reopenRegion(Class<? extends StoreFileTrackerBase> trackerImplClass)
|
||||||
|
throws IOException {
|
||||||
|
region.flush(true);
|
||||||
|
List<String> before = getStoreFiles();
|
||||||
|
region.close();
|
||||||
|
Configuration conf = new Configuration(UTIL.getConfiguration());
|
||||||
|
conf.setClass(StoreFileTrackerFactory.TRACK_IMPL, trackerImplClass, StoreFileTracker.class);
|
||||||
|
region = HRegion.openHRegion(rootDir, RI, TD, wal, conf);
|
||||||
|
List<String> after = getStoreFiles();
|
||||||
|
assertEquals(before.size(), after.size());
|
||||||
|
assertThat(after, hasItems(before.toArray(new String[0])));
|
||||||
|
}
|
||||||
|
|
||||||
|
private void putData(int start, int end) throws IOException {
|
||||||
|
for (int i = start; i < end; i++) {
|
||||||
|
region.put(new Put(Bytes.toBytes(i)).addColumn(CF, CQ, Bytes.toBytes(i)));
|
||||||
|
if (i % 30 == 0) {
|
||||||
|
region.flush(true);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
private void verifyData(int start, int end) throws IOException {
|
||||||
|
for (int i = start; i < end; i++) {
|
||||||
|
Result result = region.get(new Get(Bytes.toBytes(i)));
|
||||||
|
assertEquals(i, Bytes.toInt(result.getValue(CF, CQ)));
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void testMigration() throws IOException {
|
||||||
|
region = createRegion(srcImplClass);
|
||||||
|
putData(0, 100);
|
||||||
|
verifyData(0, 100);
|
||||||
|
reopenRegion(MigrationStoreFileTracker.class);
|
||||||
|
verifyData(0, 100);
|
||||||
|
region.compact(true);
|
||||||
|
putData(100, 200);
|
||||||
|
reopenRegion(dstImplClass);
|
||||||
|
verifyData(0, 200);
|
||||||
|
}
|
||||||
|
}
|
Loading…
Reference in New Issue