HBASE-27775 Use a separate WAL provider for hbase:replication table (#5157)

Signed-off-by: Liangjun He <heliangjun@apache.org>
This commit is contained in:
Duo Zhang 2023-04-08 10:50:42 +08:00 committed by Duo Zhang
parent 650350a749
commit 8fe691c705
6 changed files with 260 additions and 72 deletions

View File

@ -136,4 +136,10 @@ public final class ReplicationStorageFactory {
return ReflectionUtils.newInstance(clazz, conf, tableName);
}
}
public static boolean isReplicationQueueTable(Configuration conf, TableName tableName) {
TableName replicationQueueTableName = TableName.valueOf(conf.get(REPLICATION_QUEUE_TABLE_NAME,
REPLICATION_QUEUE_TABLE_NAME_DEFAULT.getNameAsString()));
return replicationQueueTableName.equals(tableName);
}
}

View File

@ -982,12 +982,12 @@ class MetricsRegionServerWrapperImpl implements MetricsRegionServerWrapper {
lastRan = currentTime;
final WALProvider provider = regionServer.getWalFactory().getWALProvider();
final WALProvider metaProvider = regionServer.getWalFactory().getMetaWALProvider();
numWALFiles = (provider == null ? 0 : provider.getNumLogFiles())
+ (metaProvider == null ? 0 : metaProvider.getNumLogFiles());
walFileSize = (provider == null ? 0 : provider.getLogFileSize())
+ (metaProvider == null ? 0 : metaProvider.getLogFileSize());
List<WALProvider> providers = regionServer.getWalFactory().getAllWALProviders();
for (WALProvider provider : providers) {
numWALFiles += provider.getNumLogFiles();
walFileSize += provider.getLogFileSize();
}
// Copy over computed values so that no thread sees half computed values.
numStores = tempNumStores;
numStoreFiles = tempNumStoreFiles;

View File

@ -0,0 +1,108 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hbase.wal;
import java.io.Closeable;
import java.io.IOException;
import java.util.concurrent.atomic.AtomicReference;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.Abortable;
import org.apache.hadoop.hbase.regionserver.wal.MetricsWAL;
import org.apache.hadoop.hbase.wal.WALFactory.Providers;
import org.apache.yetus.audience.InterfaceAudience;
/**
* A lazy initialized WAL provider for holding the WALProvider for some special tables, such as
* hbase:meta, hbase:replication, etc.
*/
@InterfaceAudience.Private
class LazyInitializedWALProvider implements Closeable {
private final WALFactory factory;
private final String providerId;
private final String providerConfigName;
private final Abortable abortable;
private final AtomicReference<WALProvider> holder = new AtomicReference<>();
LazyInitializedWALProvider(WALFactory factory, String providerId, String providerConfigName,
Abortable abortable) {
this.factory = factory;
this.providerId = providerId;
this.providerConfigName = providerConfigName;
this.abortable = abortable;
}
WALProvider getProvider() throws IOException {
Configuration conf = factory.getConf();
for (;;) {
WALProvider provider = this.holder.get();
if (provider != null) {
return provider;
}
Class<? extends WALProvider> clz = null;
if (conf.get(providerConfigName) == null) {
try {
clz = conf.getClass(WALFactory.WAL_PROVIDER, Providers.defaultProvider.clazz,
WALProvider.class);
} catch (Throwable t) {
// the WAL provider should be an enum. Proceed
}
}
if (clz == null) {
clz = factory.getProviderClass(providerConfigName,
conf.get(WALFactory.WAL_PROVIDER, WALFactory.DEFAULT_WAL_PROVIDER));
}
provider = WALFactory.createProvider(clz);
provider.init(factory, conf, providerId, this.abortable);
provider.addWALActionsListener(new MetricsWAL());
if (this.holder.compareAndSet(null, provider)) {
return provider;
} else {
// someone is ahead of us, close and try again.
provider.close();
}
}
}
/**
* Get the provider if it already initialized, otherwise just return {@code null} instead of
* creating it.
*/
WALProvider getProviderNoCreate() {
return holder.get();
}
@Override
public void close() throws IOException {
WALProvider provider = this.holder.get();
if (provider != null) {
provider.close();
}
}
void shutdown() throws IOException {
WALProvider provider = this.holder.get();
if (provider != null) {
provider.shutdown();
}
}
}

View File

@ -20,6 +20,7 @@ package org.apache.hadoop.hbase.wal;
import com.google.errorprone.annotations.RestrictedApi;
import java.io.IOException;
import java.io.InterruptedIOException;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.atomic.AtomicReference;
import org.apache.hadoop.conf.Configuration;
@ -28,10 +29,12 @@ import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hbase.Abortable;
import org.apache.hadoop.hbase.ServerName;
import org.apache.hadoop.hbase.client.RegionInfo;
import org.apache.hadoop.hbase.client.RegionReplicaUtil;
import org.apache.hadoop.hbase.io.asyncfs.monitor.ExcludeDatanodeManager;
import org.apache.hadoop.hbase.regionserver.wal.MetricsWAL;
import org.apache.hadoop.hbase.regionserver.wal.ProtobufWALStreamReader;
import org.apache.hadoop.hbase.regionserver.wal.ProtobufWALTailingReader;
import org.apache.hadoop.hbase.replication.ReplicationStorageFactory;
import org.apache.hadoop.hbase.util.CancelableProgressable;
import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
import org.apache.hadoop.hbase.util.LeaseNotRecoveredException;
@ -99,15 +102,22 @@ public class WALFactory {
public static final String META_WAL_PROVIDER = "hbase.wal.meta_provider";
public static final String REPLICATION_WAL_PROVIDER = "hbase.wal.replication_provider";
public static final String WAL_ENABLED = "hbase.regionserver.hlog.enabled";
static final String REPLICATION_WAL_PROVIDER_ID = "rep";
final String factoryId;
final Abortable abortable;
private final WALProvider provider;
// The meta updates are written to a different wal. If this
// regionserver holds meta regions, then this ref will be non-null.
// lazily intialized; most RegionServers don't deal with META
private final AtomicReference<WALProvider> metaProvider = new AtomicReference<>();
private final LazyInitializedWALProvider metaProvider;
// This is for avoid hbase:replication itself keeps trigger unnecessary updates to WAL file and
// generate a lot useless data, see HBASE-27775 for more details.
private final LazyInitializedWALProvider replicationProvider;
/**
* Configuration-specified WAL Reader used when a custom reader is requested
@ -144,13 +154,15 @@ public class WALFactory {
factoryId = SINGLETON_ID;
this.abortable = null;
this.excludeDatanodeManager = new ExcludeDatanodeManager(conf);
this.metaProvider = null;
this.replicationProvider = null;
}
Providers getDefaultProvider() {
return Providers.defaultProvider;
}
public Class<? extends WALProvider> getProviderClass(String key, String defaultValue) {
Class<? extends WALProvider> getProviderClass(String key, String defaultValue) {
try {
Providers provider = Providers.valueOf(conf.get(key, defaultValue));
@ -246,6 +258,10 @@ public class WALFactory {
this.factoryId = factoryId;
this.excludeDatanodeManager = new ExcludeDatanodeManager(conf);
this.abortable = abortable;
this.metaProvider = new LazyInitializedWALProvider(this,
AbstractFSWALProvider.META_WAL_PROVIDER_ID, META_WAL_PROVIDER, this.abortable);
this.replicationProvider = new LazyInitializedWALProvider(this, REPLICATION_WAL_PROVIDER_ID,
REPLICATION_WAL_PROVIDER, this.abortable);
// end required early initialization
if (conf.getBoolean(WAL_ENABLED, true)) {
WALProvider provider = createProvider(getProviderClass(WAL_PROVIDER, DEFAULT_WAL_PROVIDER));
@ -263,19 +279,45 @@ public class WALFactory {
}
}
public Configuration getConf() {
return conf;
}
/**
* Shutdown all WALs and clean up any underlying storage. Use only when you will not need to
* replay and edits that have gone to any wals from this factory.
*/
public void close() throws IOException {
final WALProvider metaProvider = this.metaProvider.get();
if (null != metaProvider) {
List<IOException> ioes = new ArrayList<>();
// these fields could be null if the WALFactory is created only for being used in the
// getInstance method.
if (metaProvider != null) {
try {
metaProvider.close();
} catch (IOException e) {
ioes.add(e);
}
// close is called on a WALFactory with null provider in the case of contention handling
// within the getInstance method.
if (null != provider) {
}
if (replicationProvider != null) {
try {
replicationProvider.close();
} catch (IOException e) {
ioes.add(e);
}
}
if (provider != null) {
try {
provider.close();
} catch (IOException e) {
ioes.add(e);
}
}
if (!ioes.isEmpty()) {
IOException ioe = new IOException("Failed to close WALFactory");
for (IOException e : ioes) {
ioe.addSuppressed(e);
}
throw ioe;
}
}
@ -285,18 +327,36 @@ public class WALFactory {
* if you can as it will try to leave things as tidy as possible.
*/
public void shutdown() throws IOException {
IOException exception = null;
final WALProvider metaProvider = this.metaProvider.get();
if (null != metaProvider) {
List<IOException> ioes = new ArrayList<>();
// these fields could be null if the WALFactory is created only for being used in the
// getInstance method.
if (metaProvider != null) {
try {
metaProvider.shutdown();
} catch (IOException ioe) {
exception = ioe;
} catch (IOException e) {
ioes.add(e);
}
}
if (replicationProvider != null) {
try {
replicationProvider.shutdown();
} catch (IOException e) {
ioes.add(e);
}
}
if (provider != null) {
try {
provider.shutdown();
if (null != exception) {
throw exception;
} catch (IOException e) {
ioes.add(e);
}
}
if (!ioes.isEmpty()) {
IOException ioe = new IOException("Failed to shutdown WALFactory");
for (IOException e : ioes) {
ioe.addSuppressed(e);
}
throw ioe;
}
}
@ -304,38 +364,16 @@ public class WALFactory {
return provider.getWALs();
}
/**
* Called when we lazily create a hbase:meta WAL OR from ReplicationSourceManager ahead of
* creating the first hbase:meta WAL so we can register a listener.
* @see #getMetaWALProvider()
*/
public WALProvider getMetaProvider() throws IOException {
for (;;) {
WALProvider provider = this.metaProvider.get();
if (provider != null) {
return provider;
}
Class<? extends WALProvider> clz = null;
if (conf.get(META_WAL_PROVIDER) == null) {
try {
clz = conf.getClass(WAL_PROVIDER, Providers.defaultProvider.clazz, WALProvider.class);
} catch (Throwable t) {
// the WAL provider should be an enum. Proceed
}
}
if (clz == null) {
clz = getProviderClass(META_WAL_PROVIDER, conf.get(WAL_PROVIDER, DEFAULT_WAL_PROVIDER));
}
provider = createProvider(clz);
provider.init(this, conf, AbstractFSWALProvider.META_WAL_PROVIDER_ID, this.abortable);
provider.addWALActionsListener(new MetricsWAL());
if (metaProvider.compareAndSet(null, provider)) {
return provider;
} else {
// someone is ahead of us, close and try again.
provider.close();
}
@RestrictedApi(explanation = "Should only be called in tests", link = "",
allowedOnPath = ".*/src/test/.*")
WALProvider getMetaProvider() throws IOException {
return metaProvider.getProvider();
}
@RestrictedApi(explanation = "Should only be called in tests", link = "",
allowedOnPath = ".*/src/test/.*")
WALProvider getReplicationProvider() throws IOException {
return replicationProvider.getProvider();
}
/**
@ -343,15 +381,15 @@ public class WALFactory {
*/
public WAL getWAL(RegionInfo region) throws IOException {
// Use different WAL for hbase:meta. Instantiates the meta WALProvider if not already up.
if (
region != null && region.isMetaRegion()
&& region.getReplicaId() == RegionInfo.DEFAULT_REPLICA_ID
) {
return getMetaProvider().getWAL(region);
} else {
return provider.getWAL(region);
if (region != null && RegionReplicaUtil.isDefaultReplica(region)) {
if (region.isMetaRegion()) {
return metaProvider.getProvider().getWAL(region);
} else if (ReplicationStorageFactory.isReplicationQueueTable(conf, region.getTable())) {
return replicationProvider.getProvider().getWAL(region);
}
}
return provider.getWAL(region);
}
public WALStreamReader createStreamReader(FileSystem fs, Path path) throws IOException {
return createStreamReader(fs, path, (CancelableProgressable) null);
@ -527,16 +565,28 @@ public class WALFactory {
return FSHLogProvider.createWriter(configuration, fs, path, false);
}
public final WALProvider getWALProvider() {
public WALProvider getWALProvider() {
return this.provider;
}
/**
* @return Current metaProvider... may be null if not yet initialized.
* @see #getMetaProvider()
* Returns all the wal providers, for example, the default one, the one for hbase:meta and the one
* for hbase:replication.
*/
public final WALProvider getMetaWALProvider() {
return this.metaProvider.get();
public List<WALProvider> getAllWALProviders() {
List<WALProvider> providers = new ArrayList<>();
if (provider != null) {
providers.add(provider);
}
WALProvider meta = metaProvider.getProviderNoCreate();
if (meta != null) {
providers.add(meta);
}
WALProvider replication = replicationProvider.getProviderNoCreate();
if (replication != null) {
providers.add(replication);
}
return providers;
}
public ExcludeDatanodeManager getExcludeDatanodeManager() {

View File

@ -51,7 +51,6 @@ import org.apache.hadoop.hbase.testclassification.LargeTests;
import org.apache.hadoop.hbase.testclassification.ReplicationTests;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.hbase.zookeeper.MiniZooKeeperCluster;
import org.apache.hadoop.hbase.zookeeper.ZKWatcher;
import org.junit.BeforeClass;
import org.junit.ClassRule;
import org.junit.Test;
@ -108,7 +107,6 @@ public class TestMultiSlaveReplication {
utility1.startMiniZKCluster();
MiniZooKeeperCluster miniZK = utility1.getZkCluster();
utility1.setZkCluster(miniZK);
new ZKWatcher(conf1, "cluster1", null, true);
conf2 = new Configuration(conf1);
conf2.set(HConstants.ZOOKEEPER_ZNODE_PARENT, "/2");
@ -118,11 +116,9 @@ public class TestMultiSlaveReplication {
utility2 = new HBaseTestingUtil(conf2);
utility2.setZkCluster(miniZK);
new ZKWatcher(conf2, "cluster2", null, true);
utility3 = new HBaseTestingUtil(conf3);
utility3.setZkCluster(miniZK);
new ZKWatcher(conf3, "cluster3", null, true);
table = TableDescriptorBuilder.newBuilder(tableName)
.setColumnFamily(ColumnFamilyDescriptorBuilder.newBuilder(famName)
@ -133,7 +129,7 @@ public class TestMultiSlaveReplication {
@Test
public void testMultiSlaveReplication() throws Exception {
LOG.info("testCyclicReplication");
SingleProcessHBaseCluster master = utility1.startMiniCluster();
utility1.startMiniCluster();
utility2.startMiniCluster();
utility3.startMiniCluster();
try (Connection conn = ConnectionFactory.createConnection(conf1);

View File

@ -22,6 +22,7 @@ import static org.apache.hadoop.hbase.wal.WALFactory.WAL_PROVIDER;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertNotNull;
import static org.junit.Assert.assertNotSame;
import static org.junit.Assert.assertThrows;
import static org.junit.Assert.assertTrue;
import static org.junit.Assert.fail;
@ -64,6 +65,7 @@ import org.apache.hadoop.hbase.regionserver.wal.CompressionContext;
import org.apache.hadoop.hbase.regionserver.wal.WALActionsListener;
import org.apache.hadoop.hbase.regionserver.wal.WALCellCodec;
import org.apache.hadoop.hbase.regionserver.wal.WALCoprocessorHost;
import org.apache.hadoop.hbase.replication.ReplicationStorageFactory;
import org.apache.hadoop.hbase.testclassification.MediumTests;
import org.apache.hadoop.hbase.testclassification.RegionServerTests;
import org.apache.hadoop.hbase.util.Bytes;
@ -708,6 +710,32 @@ public class TestWALFactory {
assertEquals(IOTestProvider.class, metaWALProvider.getClass());
}
@Test
public void testCustomReplicationProvider() throws IOException {
final Configuration config = new Configuration();
config.set(WALFactory.REPLICATION_WAL_PROVIDER, IOTestProvider.class.getName());
final WALFactory walFactory = new WALFactory(config, this.currentServername.toString());
Class<? extends WALProvider> walProvider =
walFactory.getProviderClass(WALFactory.WAL_PROVIDER, Providers.filesystem.name());
assertEquals(Providers.filesystem.clazz, walProvider);
WALProvider replicationWALProvider = walFactory.getReplicationProvider();
assertEquals(IOTestProvider.class, replicationWALProvider.getClass());
}
/**
* Confirm that we will use different WALs for hbase:meta and hbase:replication
*/
@Test
public void testDifferentWALs() throws IOException {
WAL normalWAL = wals.getWAL(null);
WAL metaWAL = wals.getWAL(RegionInfoBuilder.FIRST_META_REGIONINFO);
WAL replicationWAL = wals.getWAL(RegionInfoBuilder
.newBuilder(ReplicationStorageFactory.REPLICATION_QUEUE_TABLE_NAME_DEFAULT).build());
assertNotSame(normalWAL, metaWAL);
assertNotSame(normalWAL, replicationWAL);
assertNotSame(metaWAL, replicationWAL);
}
@Test
public void testReaderClosedOnBadCodec() throws IOException {
// Create our own Configuration and WALFactory to avoid breaking other test methods