HBASE-25988 Store the store file list by a file (#3578)

Signed-off-by: Wellington Chevreuil <wchevreuil@apache.org>
This commit is contained in:
Duo Zhang 2021-08-26 18:51:12 +08:00 committed by Andrew Purtell
parent 073656bf88
commit 60135108f4
9 changed files with 451 additions and 19 deletions

View File

@ -0,0 +1,36 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
syntax = "proto2";
// This file contains protocol buffers that are used for store file tracker.
package hbase.pb;
option java_package = "org.apache.hadoop.hbase.shaded.protobuf.generated";
option java_outer_classname = "StoreFileTrackerProtos";
option java_generic_services = true;
option java_generate_equals_and_hash = true;
option optimize_for = SPEED;
message StoreFileEntry {
required string name = 1;
required uint64 size = 2;
}
message StoreFileList {
required uint64 timestamp = 1;
repeated StoreFileEntry store_file = 2;
}

View File

@ -22,6 +22,7 @@ import java.util.Collection;
import java.util.function.Supplier;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hbase.CellComparator;
import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.client.ColumnFamilyDescriptor;
import org.apache.hadoop.hbase.client.RegionInfo;
import org.apache.hadoop.hbase.io.HeapSize;
@ -109,6 +110,10 @@ public final class StoreContext implements HeapSize {
return coprocessorHost;
}
public TableName getTableName() {
return getRegionInfo().getTable();
}
public RegionInfo getRegionInfo() {
return regionFileSystem.getRegionInfo();
}

View File

@ -173,9 +173,9 @@ public abstract class StoreEngine<SF extends StoreFlusher, CP extends Compaction
return this.storeFlusher;
}
private StoreFileTracker createStoreFileTracker(HStore store) {
return StoreFileTrackerFactory.create(store.conf, store.getRegionInfo().getTable(),
store.isPrimaryReplicaStore(), store.getStoreContext());
private StoreFileTracker createStoreFileTracker(Configuration conf, HStore store) {
return StoreFileTrackerFactory.create(conf, store.isPrimaryReplicaStore(),
store.getStoreContext());
}
/**
@ -206,7 +206,7 @@ public abstract class StoreEngine<SF extends StoreFlusher, CP extends Compaction
this.ctx = store.getStoreContext();
this.coprocessorHost = store.getHRegion().getCoprocessorHost();
this.openStoreFileThreadPoolCreator = store.getHRegion()::getStoreFileOpenAndCloseThreadPool;
this.storeFileTracker = createStoreFileTracker(store);
this.storeFileTracker = createStoreFileTracker(conf, store);
assert compactor != null && compactionPolicy != null && storeFileManager != null &&
storeFlusher != null && storeFileTracker != null;
}

View File

@ -21,7 +21,6 @@ import java.io.IOException;
import java.util.Collection;
import java.util.List;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.regionserver.StoreContext;
import org.apache.hadoop.hbase.regionserver.StoreFileInfo;
import org.apache.yetus.audience.InterfaceAudience;
@ -33,9 +32,9 @@ import org.apache.yetus.audience.InterfaceAudience;
@InterfaceAudience.Private
class DefaultStoreFileTracker extends StoreFileTrackerBase {
public DefaultStoreFileTracker(Configuration conf, TableName tableName, boolean isPrimaryReplica,
public DefaultStoreFileTracker(Configuration conf, boolean isPrimaryReplica,
StoreContext ctx) {
super(conf, tableName, isPrimaryReplica, ctx);
super(conf, isPrimaryReplica, ctx);
}
@Override

View File

@ -0,0 +1,142 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hbase.regionserver.storefiletracker;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.stream.Collectors;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hbase.regionserver.StoreContext;
import org.apache.hadoop.hbase.regionserver.StoreFileInfo;
import org.apache.hadoop.hbase.util.ServerRegionReplicaUtil;
import org.apache.yetus.audience.InterfaceAudience;
import org.apache.hadoop.hbase.shaded.protobuf.generated.StoreFileTrackerProtos.StoreFileEntry;
import org.apache.hadoop.hbase.shaded.protobuf.generated.StoreFileTrackerProtos.StoreFileList;
/**
* A file based store file tracker.
* <p/>
* For this tracking way, the store file list will be persistent into a file, so we can write the
* new store files directly to the final data directory, as we will not load the broken files. This
* will greatly reduce the time for flush and compaction on some object storages as a rename is
* actual a copy on them. And it also avoid listing when loading store file list, which could also
* speed up the loading of store files as listing is also not a fast operation on most object
* storages.
*/
@InterfaceAudience.Private
public class FileBasedStoreFileTracker extends StoreFileTrackerBase {
private final StoreFileListFile backedFile;
private final Map<String, StoreFileInfo> storefiles = new HashMap<>();
public FileBasedStoreFileTracker(Configuration conf, boolean isPrimaryReplica, StoreContext ctx) {
super(conf, isPrimaryReplica, ctx);
backedFile = new StoreFileListFile(ctx);
}
@Override
public List<StoreFileInfo> load() throws IOException {
StoreFileList list = backedFile.load();
if (list == null) {
return Collections.emptyList();
}
FileSystem fs = ctx.getRegionFileSystem().getFileSystem();
List<StoreFileInfo> infos = new ArrayList<>();
for (StoreFileEntry entry : list.getStoreFileList()) {
infos.add(ServerRegionReplicaUtil.getStoreFileInfo(conf, fs, ctx.getRegionInfo(),
ctx.getRegionFileSystem().getRegionInfoForFS(), ctx.getFamily().getNameAsString(),
new Path(ctx.getFamilyStoreDirectoryPath(), entry.getName())));
}
// In general, for primary replica, the load method should only be called once when
// initialization, so we do not need synchronized here. And for secondary replicas, though the
// load method could be called multiple times, we will never call other methods so no
// synchronized is also fine.
// But we have a refreshStoreFiles method in the Region interface, which can be called by CPs,
// and we have a RefreshHFilesEndpoint example to expose the refreshStoreFiles method as RPC, so
// for safety, let's still keep the synchronized here.
synchronized (storefiles) {
for (StoreFileInfo info : infos) {
storefiles.put(info.getPath().getName(), info);
}
}
return infos;
}
@Override
protected boolean requireWritingToTmpDirFirst() {
return false;
}
private StoreFileEntry toStoreFileEntry(StoreFileInfo info) {
return StoreFileEntry.newBuilder().setName(info.getPath().getName()).setSize(info.getSize())
.build();
}
@Override
protected void doAddNewStoreFiles(Collection<StoreFileInfo> newFiles) throws IOException {
synchronized (storefiles) {
StoreFileList.Builder builder = StoreFileList.newBuilder();
for (StoreFileInfo info : storefiles.values()) {
builder.addStoreFile(toStoreFileEntry(info));
}
for (StoreFileInfo info : newFiles) {
builder.addStoreFile(toStoreFileEntry(info));
}
backedFile.update(builder);
for (StoreFileInfo info : newFiles) {
storefiles.put(info.getPath().getName(), info);
}
}
}
@Override
protected void doAddCompactionResults(Collection<StoreFileInfo> compactedFiles,
Collection<StoreFileInfo> newFiles) throws IOException {
Set<String> compactedFileNames =
compactedFiles.stream().map(info -> info.getPath().getName()).collect(Collectors.toSet());
synchronized (storefiles) {
StoreFileList.Builder builder = StoreFileList.newBuilder();
storefiles.forEach((name, info) -> {
if (compactedFileNames.contains(name)) {
return;
}
builder.addStoreFile(toStoreFileEntry(info));
});
for (StoreFileInfo info : newFiles) {
builder.addStoreFile(toStoreFileEntry(info));
}
backedFile.update(builder);
for (String name : compactedFileNames) {
storefiles.remove(name);
}
for (StoreFileInfo info : newFiles) {
storefiles.put(info.getPath().getName(), info);
}
}
}
}

View File

@ -0,0 +1,142 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hbase.regionserver.storefiletracker;
import java.io.FileNotFoundException;
import java.io.IOException;
import org.apache.hadoop.fs.FSDataInputStream;
import org.apache.hadoop.fs.FSDataOutputStream;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hbase.regionserver.StoreContext;
import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
import org.apache.yetus.audience.InterfaceAudience;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.apache.hbase.thirdparty.com.google.common.base.Preconditions;
import org.apache.hbase.thirdparty.com.google.common.io.ByteStreams;
import org.apache.hbase.thirdparty.com.google.protobuf.InvalidProtocolBufferException;
import org.apache.hadoop.hbase.shaded.protobuf.generated.StoreFileTrackerProtos.StoreFileList;
/**
* To fully avoid listing, here we use two files for tracking. When loading, we will try to read
* both the two files, if only one exists, we will trust this one, if both exist, we will compare
* the timestamp to see which one is newer and trust that one. And we will record in memory that
* which one is trusted by us, and when we need to update the store file list, we will write to the
* other file.
* <p/>
* So in this way, we could avoid listing when we want to load the store file list file.
*/
@InterfaceAudience.Private
class StoreFileListFile {
private static final Logger LOG = LoggerFactory.getLogger(StoreFileListFile.class);
private static final String TRACK_FILE_DIR = ".filelist";
private static final String TRACK_FILE = "f1";
private static final String TRACK_FILE_ROTATE = "f2";
private final StoreContext ctx;
private final Path trackFileDir;
private final Path[] trackFiles = new Path[2];
// this is used to make sure that we do not go backwards
private long prevTimestamp = -1;
private int nextTrackFile = -1;
StoreFileListFile(StoreContext ctx) {
this.ctx = ctx;
trackFileDir = new Path(ctx.getFamilyStoreDirectoryPath(), TRACK_FILE_DIR);
trackFiles[0] = new Path(trackFileDir, TRACK_FILE);
trackFiles[1] = new Path(trackFileDir, TRACK_FILE_ROTATE);
}
private StoreFileList load(Path path) throws IOException {
FileSystem fs = ctx.getRegionFileSystem().getFileSystem();
byte[] bytes;
try (FSDataInputStream in = fs.open(path)) {
bytes = ByteStreams.toByteArray(in);
}
// Read all the bytes and then parse it, so we will only throw InvalidProtocolBufferException
// here. This is very important for upper layer to determine whether this is the normal case,
// where the file does not exist or is incomplete. If there is another type of exception, the
// upper layer should throw it out instead of just ignoring it, otherwise it will lead to data
// loss.
return StoreFileList.parseFrom(bytes);
}
private int select(StoreFileList[] lists) {
if (lists[0] == null) {
return 1;
}
if (lists[1] == null) {
return 0;
}
return lists[0].getTimestamp() >= lists[1].getTimestamp() ? 0 : 1;
}
StoreFileList load() throws IOException {
StoreFileList[] lists = new StoreFileList[2];
for (int i = 0; i < 2; i++) {
try {
lists[i] = load(trackFiles[i]);
} catch (FileNotFoundException | InvalidProtocolBufferException e) {
// this is normal case, so use info and do not log stacktrace
LOG.info("Failed to load track file {}: {}", trackFiles[i], e);
}
}
int winnerIndex = select(lists);
if (lists[winnerIndex] != null) {
nextTrackFile = 1 - winnerIndex;
prevTimestamp = lists[winnerIndex].getTimestamp();
} else {
nextTrackFile = 0;
}
return lists[winnerIndex];
}
/**
* We will set the timestamp in this method so just pass the builder in
*/
void update(StoreFileList.Builder builder) throws IOException {
Preconditions.checkState(nextTrackFile >= 0, "should call load first before calling update");
FileSystem fs = ctx.getRegionFileSystem().getFileSystem();
long timestamp = Math.max(prevTimestamp + 1, EnvironmentEdgeManager.currentTime());
try (FSDataOutputStream out = fs.create(trackFiles[nextTrackFile], true)) {
builder.setTimestamp(timestamp).build().writeTo(out);
}
// record timestamp
prevTimestamp = timestamp;
// rotate the file
nextTrackFile = 1 - nextTrackFile;
try {
fs.delete(trackFiles[nextTrackFile], false);
} catch (IOException e) {
// we will create new file with overwrite = true, so not a big deal here, only for speed up
// loading as we do not need to read this file when loading(we will hit FileNotFoundException)
LOG.debug("failed to delete old track file {}, not a big deal, just ignore", e);
}
}
}

View File

@ -21,7 +21,6 @@ import java.io.IOException;
import java.util.Collection;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.client.ColumnFamilyDescriptor;
import org.apache.hadoop.hbase.io.compress.Compression;
import org.apache.hadoop.hbase.io.crypto.Encryption;
@ -51,18 +50,14 @@ abstract class StoreFileTrackerBase implements StoreFileTracker {
protected final Configuration conf;
protected final TableName tableName;
protected final boolean isPrimaryReplica;
protected final StoreContext ctx;
private volatile boolean cacheOnWriteLogged;
protected StoreFileTrackerBase(Configuration conf, TableName tableName, boolean isPrimaryReplica,
StoreContext ctx) {
protected StoreFileTrackerBase(Configuration conf, boolean isPrimaryReplica, StoreContext ctx) {
this.conf = conf;
this.tableName = tableName;
this.isPrimaryReplica = isPrimaryReplica;
this.ctx = ctx;
}
@ -95,7 +90,7 @@ abstract class StoreFileTrackerBase implements StoreFileTracker {
.withBlockSize(family.getBlocksize()).withHBaseCheckSum(true)
.withDataBlockEncoding(family.getDataBlockEncoding()).withEncryptionContext(encryptionContext)
.withCreateTime(EnvironmentEdgeManager.currentTime()).withColumnFamily(family.getName())
.withTableName(tableName.getName()).withCellComparator(ctx.getComparator()).build();
.withTableName(ctx.getTableName().getName()).withCellComparator(ctx.getComparator()).build();
return hFileContext;
}
@ -153,7 +148,7 @@ abstract class StoreFileTrackerBase implements StoreFileTracker {
outputDir =
new Path(ctx.getRegionFileSystem().getTempDir(), ctx.getFamily().getNameAsString());
} else {
throw new UnsupportedOperationException("not supported yet");
outputDir = ctx.getFamilyStoreDirectoryPath();
}
StoreFileWriter.Builder builder =
new StoreFileWriter.Builder(conf, writerCacheConf, ctx.getRegionFileSystem().getFileSystem())

View File

@ -18,8 +18,8 @@
package org.apache.hadoop.hbase.regionserver.storefiletracker;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.regionserver.StoreContext;
import org.apache.hadoop.hbase.util.ReflectionUtils;
import org.apache.yetus.audience.InterfaceAudience;
/**
@ -28,8 +28,12 @@ import org.apache.yetus.audience.InterfaceAudience;
@InterfaceAudience.Private
public final class StoreFileTrackerFactory {
public static StoreFileTracker create(Configuration conf, TableName tableName,
boolean isPrimaryReplica, StoreContext ctx) {
return new DefaultStoreFileTracker(conf, tableName, isPrimaryReplica, ctx);
public static final String TRACK_IMPL = "hbase.store.file-tracker.impl";
public static StoreFileTracker create(Configuration conf, boolean isPrimaryReplica,
StoreContext ctx) {
Class<? extends StoreFileTracker> tracker =
conf.getClass(TRACK_IMPL, DefaultStoreFileTracker.class, StoreFileTracker.class);
return ReflectionUtils.newInstance(tracker, conf, isPrimaryReplica, ctx);
}
}

View File

@ -0,0 +1,109 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hbase.regionserver.storefiletracker;
import static org.junit.Assert.assertEquals;
import java.io.IOException;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.HBaseClassTestRule;
import org.apache.hadoop.hbase.HBaseTestingUtility;
import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.client.ColumnFamilyDescriptorBuilder;
import org.apache.hadoop.hbase.client.Get;
import org.apache.hadoop.hbase.client.Put;
import org.apache.hadoop.hbase.client.RegionInfo;
import org.apache.hadoop.hbase.client.RegionInfoBuilder;
import org.apache.hadoop.hbase.client.Result;
import org.apache.hadoop.hbase.client.TableDescriptor;
import org.apache.hadoop.hbase.client.TableDescriptorBuilder;
import org.apache.hadoop.hbase.regionserver.HRegion;
import org.apache.hadoop.hbase.testclassification.MediumTests;
import org.apache.hadoop.hbase.testclassification.RegionServerTests;
import org.apache.hadoop.hbase.util.Bytes;
import org.junit.After;
import org.junit.Before;
import org.junit.ClassRule;
import org.junit.Rule;
import org.junit.Test;
import org.junit.experimental.categories.Category;
import org.junit.rules.TestName;
@Category({ RegionServerTests.class, MediumTests.class })
public class TestRegionWithFileBasedStoreFileTracker {
@ClassRule
public static final HBaseClassTestRule CLASS_RULE =
HBaseClassTestRule.forClass(TestRegionWithFileBasedStoreFileTracker.class);
private static final HBaseTestingUtility UTIL = new HBaseTestingUtility();
private static final byte[] CF = Bytes.toBytes("cf");
private static final byte[] CQ = Bytes.toBytes("cq");
private static final TableDescriptor TD =
TableDescriptorBuilder.newBuilder(TableName.valueOf("file_based_tracker"))
.setColumnFamily(ColumnFamilyDescriptorBuilder.of(CF)).build();
private static final RegionInfo RI = RegionInfoBuilder.newBuilder(TD.getTableName()).build();
@Rule
public TestName name = new TestName();
private HRegion region;
@Before
public void setUp() throws IOException {
Configuration conf = new Configuration(UTIL.getConfiguration());
conf.setClass(StoreFileTrackerFactory.TRACK_IMPL, FileBasedStoreFileTracker.class,
StoreFileTracker.class);
region =
HBaseTestingUtility.createRegionAndWAL(RI, UTIL.getDataTestDir(name.getMethodName()), conf, TD);
}
@After
public void tearDown() throws IOException {
if (region != null) {
HBaseTestingUtility.closeRegionAndWAL(region);
}
UTIL.cleanupTestDir();
}
@Test
public void testFlushAndCompaction() throws IOException {
for (int i = 0; i < 10; i++) {
for (int j = 0; j < 10; j++) {
int v = i * 10 + j;
region.put(new Put(Bytes.toBytes(v)).addColumn(CF, CQ, Bytes.toBytes(v)));
}
region.flush(true);
if (i % 3 == 2) {
region.compact(true);
}
}
// reopen the region, make sure the store file tracker works, i.e, we can get all the records
// back
region.close();
region = HRegion.openHRegion(region, null);
for (int i = 0; i < 100; i++) {
Result result = region.get(new Get(Bytes.toBytes(i)));
assertEquals(i, Bytes.toInt(result.getValue(CF, CQ)));
}
}
}