HBASE-26882 Backport "HBASE-26810 Add dynamic configuration support f… (#4278)
- Include HBASE-25288 that has the configuration manager registration in HMaster Signed-off-by: Ankit Singhal <ankit@apache.org>
This commit is contained in:
parent
46e8533fe7
commit
d98483018b
|
@ -89,6 +89,7 @@ import org.apache.hadoop.hbase.client.RegionStatesCount;
|
|||
import org.apache.hadoop.hbase.client.TableDescriptor;
|
||||
import org.apache.hadoop.hbase.client.TableDescriptorBuilder;
|
||||
import org.apache.hadoop.hbase.client.TableState;
|
||||
import org.apache.hadoop.hbase.conf.ConfigurationManager;
|
||||
import org.apache.hadoop.hbase.coprocessor.CoprocessorHost;
|
||||
import org.apache.hadoop.hbase.exceptions.MasterStoppedException;
|
||||
import org.apache.hadoop.hbase.executor.ExecutorType;
|
||||
|
@ -193,6 +194,7 @@ import org.apache.hadoop.hbase.security.UserProvider;
|
|||
import org.apache.hadoop.hbase.trace.TraceUtil;
|
||||
import org.apache.hadoop.hbase.util.Addressing;
|
||||
import org.apache.hadoop.hbase.util.Bytes;
|
||||
import org.apache.hadoop.hbase.util.CoprocessorConfigurationUtil;
|
||||
import org.apache.hadoop.hbase.util.FSTableDescriptors;
|
||||
import org.apache.hadoop.hbase.util.HBaseFsck;
|
||||
import org.apache.hadoop.hbase.util.HFileArchiveUtil;
|
||||
|
@ -354,7 +356,7 @@ public class HMaster extends HRegionServer implements MasterServices {
|
|||
// the key is table name, the value is the number of compactions in that table.
|
||||
private Map<TableName, AtomicInteger> mobCompactionStates = Maps.newConcurrentMap();
|
||||
|
||||
MasterCoprocessorHost cpHost;
|
||||
volatile MasterCoprocessorHost cpHost;
|
||||
|
||||
private final boolean preLoadTableDescriptors;
|
||||
|
||||
|
@ -500,11 +502,17 @@ public class HMaster extends HRegionServer implements MasterServices {
|
|||
return conf.get(MASTER_HOSTNAME_KEY);
|
||||
}
|
||||
|
||||
private void registerConfigurationObservers() {
|
||||
configurationManager.registerObserver(this.rpcServices);
|
||||
configurationManager.registerObserver(this);
|
||||
}
|
||||
|
||||
// Main run loop. Calls through to the regionserver run loop AFTER becoming active Master; will
|
||||
// block in here until then.
|
||||
@Override
|
||||
public void run() {
|
||||
try {
|
||||
registerConfigurationObservers();
|
||||
Threads.setDaemonThreadRunning(new Thread(() -> {
|
||||
try {
|
||||
int infoPort = putUpJettyServer();
|
||||
|
@ -891,14 +899,9 @@ public class HMaster extends HRegionServer implements MasterServices {
|
|||
tableCFsUpdater.copyTableCFs();
|
||||
|
||||
if (!maintenanceMode) {
|
||||
// Add the Observer to delete quotas on table deletion before starting all CPs by
|
||||
// default with quota support, avoiding if user specifically asks to not load this Observer.
|
||||
if (QuotaUtil.isQuotaEnabled(conf)) {
|
||||
updateConfigurationForQuotasObserver(conf);
|
||||
}
|
||||
// initialize master side coprocessors before we start handling requests
|
||||
status.setStatus("Initializing master coprocessors");
|
||||
this.cpHost = new MasterCoprocessorHost(this, this.conf);
|
||||
setQuotasObserver(conf);
|
||||
initializeCoprocessorHost(conf);
|
||||
}
|
||||
|
||||
// Checking if meta needs initializing.
|
||||
|
@ -3848,4 +3851,37 @@ public class HMaster extends HRegionServer implements MasterServices {
|
|||
static void setDisableBalancerChoreForTest(boolean disable) {
|
||||
disableBalancerChoreForTest = disable;
|
||||
}
|
||||
|
||||
@Override
|
||||
public void onConfigurationChange(Configuration newConf) {
|
||||
super.onConfigurationChange(newConf);
|
||||
// append the quotas observer back to the master coprocessor key
|
||||
setQuotasObserver(newConf);
|
||||
// update region server coprocessor if the configuration has changed.
|
||||
if (CoprocessorConfigurationUtil.checkConfigurationChange(getConfiguration(), newConf,
|
||||
CoprocessorHost.MASTER_COPROCESSOR_CONF_KEY) && !maintenanceMode) {
|
||||
LOG.info("Update the master coprocessor(s) because the configuration has changed");
|
||||
initializeCoprocessorHost(newConf);
|
||||
}
|
||||
}
|
||||
|
||||
@RestrictedApi(explanation = "Should only be called in tests", link = "",
|
||||
allowedOnPath = ".*/src/test/.*")
|
||||
public ConfigurationManager getConfigurationManager() {
|
||||
return configurationManager;
|
||||
}
|
||||
|
||||
private void setQuotasObserver(Configuration conf) {
|
||||
// Add the Observer to delete quotas on table deletion before starting all CPs by
|
||||
// default with quota support, avoiding if user specifically asks to not load this Observer.
|
||||
if (QuotaUtil.isQuotaEnabled(conf)) {
|
||||
updateConfigurationForQuotasObserver(conf);
|
||||
}
|
||||
}
|
||||
|
||||
private void initializeCoprocessorHost(Configuration conf) {
|
||||
// initialize master side coprocessors before we start handling requests
|
||||
this.cpHost = new MasterCoprocessorHost(this, conf);
|
||||
}
|
||||
|
||||
}
|
||||
|
|
|
@ -185,6 +185,7 @@ import org.apache.hadoop.hbase.util.Bytes;
|
|||
import org.apache.hadoop.hbase.util.CancelableProgressable;
|
||||
import org.apache.hadoop.hbase.util.ClassSize;
|
||||
import org.apache.hadoop.hbase.util.CommonFSUtils;
|
||||
import org.apache.hadoop.hbase.util.CoprocessorConfigurationUtil;
|
||||
import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
|
||||
import org.apache.hadoop.hbase.util.FSUtils;
|
||||
import org.apache.hadoop.hbase.util.HashedBytes;
|
||||
|
@ -722,7 +723,7 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi
|
|||
private final MultiVersionConcurrencyControl mvcc;
|
||||
|
||||
// Coprocessor host
|
||||
private RegionCoprocessorHost coprocessorHost;
|
||||
private volatile RegionCoprocessorHost coprocessorHost;
|
||||
|
||||
private TableDescriptor htableDescriptor = null;
|
||||
private RegionSplitPolicy splitPolicy;
|
||||
|
@ -9289,6 +9290,14 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi
|
|||
@Override
|
||||
public void onConfigurationChange(Configuration conf) {
|
||||
this.storeHotnessProtector.update(conf);
|
||||
// update coprocessorHost if the configuration has changed.
|
||||
if (CoprocessorConfigurationUtil.checkConfigurationChange(getReadOnlyConfiguration(), conf,
|
||||
CoprocessorHost.REGION_COPROCESSOR_CONF_KEY,
|
||||
CoprocessorHost.USER_REGION_COPROCESSOR_CONF_KEY)) {
|
||||
LOG.info("Update the system coprocessors because the configuration has changed");
|
||||
decorateRegionConfiguration(conf);
|
||||
this.coprocessorHost = new RegionCoprocessorHost(this, rsServices, conf);
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
|
|
|
@ -160,6 +160,7 @@ import org.apache.hadoop.hbase.util.Addressing;
|
|||
import org.apache.hadoop.hbase.util.Bytes;
|
||||
import org.apache.hadoop.hbase.util.CommonFSUtils;
|
||||
import org.apache.hadoop.hbase.util.CompressionTest;
|
||||
import org.apache.hadoop.hbase.util.CoprocessorConfigurationUtil;
|
||||
import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
|
||||
import org.apache.hadoop.hbase.util.FSTableDescriptors;
|
||||
import org.apache.hadoop.hbase.util.FSUtils;
|
||||
|
@ -506,7 +507,7 @@ public class HRegionServer extends Thread implements
|
|||
// chore for refreshing store files for secondary regions
|
||||
private StorefileRefresherChore storefileRefresher;
|
||||
|
||||
private RegionServerCoprocessorHost rsHost;
|
||||
private volatile RegionServerCoprocessorHost rsHost;
|
||||
|
||||
private RegionServerProcedureManagerHost rspmHost;
|
||||
|
||||
|
@ -3780,6 +3781,13 @@ public class HRegionServer extends Thread implements
|
|||
} catch (IOException e) {
|
||||
LOG.warn("Failed to initialize SuperUsers on reloading of the configuration");
|
||||
}
|
||||
|
||||
// update region server coprocessor if the configuration has changed.
|
||||
if (CoprocessorConfigurationUtil.checkConfigurationChange(getConfiguration(), newConf,
|
||||
CoprocessorHost.REGIONSERVER_COPROCESSOR_CONF_KEY)) {
|
||||
LOG.info("Update region server coprocessors because the configuration has changed");
|
||||
this.rsHost = new RegionServerCoprocessorHost(this, newConf);
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
|
|
|
@ -0,0 +1,50 @@
|
|||
/*
|
||||
* Licensed to the Apache Software Foundation (ASF) under one
|
||||
* or more contributor license agreements. See the NOTICE file
|
||||
* distributed with this work for additional information
|
||||
* regarding copyright ownership. The ASF licenses this file
|
||||
* to you under the Apache License, Version 2.0 (the
|
||||
* "License"); you may not use this file except in compliance
|
||||
* with the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
package org.apache.hadoop.hbase.util;
|
||||
|
||||
import org.apache.commons.lang3.StringUtils;
|
||||
import org.apache.hadoop.conf.Configuration;
|
||||
import org.apache.yetus.audience.InterfaceAudience;
|
||||
import org.apache.hbase.thirdparty.com.google.common.base.Preconditions;
|
||||
|
||||
/**
|
||||
* Helper class for coprocessor host when configuration changes.
|
||||
*/
|
||||
@InterfaceAudience.Private
|
||||
public final class CoprocessorConfigurationUtil {
|
||||
|
||||
private CoprocessorConfigurationUtil() {
|
||||
}
|
||||
|
||||
public static boolean checkConfigurationChange(Configuration oldConfig, Configuration newConfig,
|
||||
String... configurationKey) {
|
||||
Preconditions.checkArgument(configurationKey != null, "Configuration Key(s) must be provided");
|
||||
boolean isConfigurationChange = false;
|
||||
for (String key : configurationKey) {
|
||||
String oldValue = oldConfig.get(key);
|
||||
String newValue = newConfig.get(key);
|
||||
// check if the coprocessor key has any difference
|
||||
if (!StringUtils.equalsIgnoreCase(oldValue, newValue)) {
|
||||
isConfigurationChange = true;
|
||||
break;
|
||||
}
|
||||
}
|
||||
return isConfigurationChange;
|
||||
}
|
||||
}
|
|
@ -52,6 +52,7 @@ import java.util.List;
|
|||
import java.util.Map;
|
||||
import java.util.NavigableMap;
|
||||
import java.util.Objects;
|
||||
import java.util.Set;
|
||||
import java.util.TreeMap;
|
||||
import java.util.concurrent.Callable;
|
||||
import java.util.concurrent.CountDownLatch;
|
||||
|
@ -121,6 +122,9 @@ import org.apache.hadoop.hbase.client.Table;
|
|||
import org.apache.hadoop.hbase.client.TableDescriptor;
|
||||
import org.apache.hadoop.hbase.client.TableDescriptorBuilder;
|
||||
import org.apache.hadoop.hbase.coprocessor.CoprocessorHost;
|
||||
import org.apache.hadoop.hbase.coprocessor.MetaTableMetrics;
|
||||
import org.apache.hadoop.hbase.coprocessor.RegionCoprocessor;
|
||||
import org.apache.hadoop.hbase.coprocessor.RegionObserver;
|
||||
import org.apache.hadoop.hbase.exceptions.FailedSanityCheckException;
|
||||
import org.apache.hadoop.hbase.filter.BigDecimalComparator;
|
||||
import org.apache.hadoop.hbase.filter.BinaryComparator;
|
||||
|
@ -7940,4 +7944,63 @@ public class TestHRegion {
|
|||
assertFalse("Region lock holder should not have been interrupted", holderInterrupted.get());
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testRegionOnCoprocessorsChange() throws IOException {
|
||||
byte[] cf1 = Bytes.toBytes("CF1");
|
||||
byte[][] families = { cf1 };
|
||||
|
||||
Configuration conf = new Configuration(CONF);
|
||||
region = initHRegion(tableName, method, conf, families);
|
||||
assertNull(region.getCoprocessorHost());
|
||||
|
||||
// set and verify the system coprocessors for region and user region
|
||||
Configuration newConf = new Configuration(conf);
|
||||
newConf.set(CoprocessorHost.REGION_COPROCESSOR_CONF_KEY,
|
||||
MetaTableMetrics.class.getName());
|
||||
newConf.set(CoprocessorHost.USER_REGION_COPROCESSOR_CONF_KEY,
|
||||
NoOpRegionCoprocessor.class.getName());
|
||||
// trigger configuration change
|
||||
region.onConfigurationChange(newConf);
|
||||
assertTrue(region.getCoprocessorHost() != null);
|
||||
Set<String> coprocessors = region.getCoprocessorHost().getCoprocessors();
|
||||
assertTrue(coprocessors.size() == 2);
|
||||
assertTrue(region.getCoprocessorHost().getCoprocessors()
|
||||
.contains(MetaTableMetrics.class.getSimpleName()));
|
||||
assertTrue(region.getCoprocessorHost().getCoprocessors()
|
||||
.contains(NoOpRegionCoprocessor.class.getSimpleName()));
|
||||
|
||||
// remove region coprocessor and keep only user region coprocessor
|
||||
newConf.unset(CoprocessorHost.REGION_COPROCESSOR_CONF_KEY);
|
||||
region.onConfigurationChange(newConf);
|
||||
assertTrue(region.getCoprocessorHost() != null);
|
||||
coprocessors = region.getCoprocessorHost().getCoprocessors();
|
||||
assertTrue(coprocessors.size() == 1);
|
||||
assertTrue(region.getCoprocessorHost().getCoprocessors()
|
||||
.contains(NoOpRegionCoprocessor.class.getSimpleName()));
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testRegionOnCoprocessorsWithoutChange() throws IOException {
|
||||
byte[] cf1 = Bytes.toBytes("CF1");
|
||||
byte[][] families = { cf1 };
|
||||
|
||||
Configuration conf = new Configuration(CONF);
|
||||
conf.set(CoprocessorHost.REGION_COPROCESSOR_CONF_KEY,
|
||||
MetaTableMetrics.class.getCanonicalName());
|
||||
region = initHRegion(tableName, method, conf, families);
|
||||
// region service is null in unit test, we need to load the coprocessor once
|
||||
region.setCoprocessorHost(new RegionCoprocessorHost(region, null, conf));
|
||||
RegionCoprocessor regionCoprocessor = region.getCoprocessorHost()
|
||||
.findCoprocessor(MetaTableMetrics.class.getName());
|
||||
|
||||
// simulate when other configuration may have changed and onConfigurationChange execute once
|
||||
region.onConfigurationChange(conf);
|
||||
RegionCoprocessor regionCoprocessorAfterOnConfigurationChange = region.getCoprocessorHost()
|
||||
.findCoprocessor(MetaTableMetrics.class.getName());
|
||||
assertEquals(regionCoprocessor, regionCoprocessorAfterOnConfigurationChange);
|
||||
}
|
||||
|
||||
public static class NoOpRegionCoprocessor implements RegionCoprocessor, RegionObserver {
|
||||
// a empty region coprocessor class
|
||||
}
|
||||
}
|
||||
|
|
|
@ -20,6 +20,8 @@ package org.apache.hadoop.hbase.regionserver;
|
|||
import static org.junit.Assert.assertEquals;
|
||||
import static org.junit.Assert.assertFalse;
|
||||
import static org.junit.Assert.assertTrue;
|
||||
import static org.junit.Assert.assertNotNull;
|
||||
import static org.junit.Assert.assertNull;
|
||||
|
||||
import java.io.IOException;
|
||||
import org.apache.hadoop.conf.Configuration;
|
||||
|
@ -27,10 +29,13 @@ import org.apache.hadoop.hbase.HBaseClassTestRule;
|
|||
import org.apache.hadoop.hbase.HBaseTestingUtility;
|
||||
import org.apache.hadoop.hbase.HConstants;
|
||||
import org.apache.hadoop.hbase.HRegionInfo;
|
||||
import org.apache.hadoop.hbase.JMXListener;
|
||||
import org.apache.hadoop.hbase.TableName;
|
||||
import org.apache.hadoop.hbase.client.RegionLocator;
|
||||
import org.apache.hadoop.hbase.client.Table;
|
||||
import org.apache.hadoop.hbase.client.TableDescriptorBuilder;
|
||||
import org.apache.hadoop.hbase.coprocessor.CoprocessorHost;
|
||||
import org.apache.hadoop.hbase.master.HMaster;
|
||||
import org.apache.hadoop.hbase.regionserver.compactions.CompactionConfiguration;
|
||||
import org.apache.hadoop.hbase.testclassification.MediumTests;
|
||||
import org.apache.hadoop.hbase.util.Bytes;
|
||||
|
@ -62,6 +67,7 @@ public class TestRegionServerOnlineConfigChange {
|
|||
|
||||
private static Table t1 = null;
|
||||
private static HRegionServer rs1 = null;
|
||||
private static HMaster hMaster = null;
|
||||
private static byte[] r1name = null;
|
||||
private static Region r1 = null;
|
||||
|
||||
|
@ -85,6 +91,7 @@ public class TestRegionServerOnlineConfigChange {
|
|||
rs1 = hbaseTestingUtility.getHBaseCluster().getRegionServer(
|
||||
hbaseTestingUtility.getHBaseCluster().getServerWith(r1name));
|
||||
r1 = rs1.getRegion(r1name);
|
||||
hMaster = hbaseTestingUtility.getHBaseCluster().getMaster();
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -237,4 +244,24 @@ public class TestRegionServerOnlineConfigChange {
|
|||
.getLong(TableDescriptorBuilder.MAX_FILESIZE, -1);
|
||||
assertEquals(MAX_FILE_SIZE, actualMaxFileSize);
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testCoprocessorConfigurationOnlineChange() {
|
||||
assertNull(rs1.getRegionServerCoprocessorHost().findCoprocessor(JMXListener.class.getName()));
|
||||
conf.set(CoprocessorHost.REGIONSERVER_COPROCESSOR_CONF_KEY, JMXListener.class.getName());
|
||||
rs1.getConfigurationManager().notifyAllObservers(conf);
|
||||
assertNotNull(
|
||||
rs1.getRegionServerCoprocessorHost().findCoprocessor(JMXListener.class.getName()));
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testCoprocessorConfigurationOnlineChangeOnMaster() {
|
||||
assertNull(hMaster.getMasterCoprocessorHost().findCoprocessor(JMXListener.class.getName()));
|
||||
conf.set(CoprocessorHost.MASTER_COPROCESSOR_CONF_KEY, JMXListener.class.getName());
|
||||
assertFalse(hMaster.isInMaintenanceMode());
|
||||
hMaster.getConfigurationManager().notifyAllObservers(conf);
|
||||
assertNotNull(
|
||||
hMaster.getMasterCoprocessorHost().findCoprocessor(JMXListener.class.getName()));
|
||||
}
|
||||
|
||||
}
|
||||
|
|
|
@ -1312,6 +1312,10 @@ Here are those configurations:
|
|||
| Key
|
||||
| hbase.ipc.server.fallback-to-simple-auth-allowed
|
||||
| hbase.cleaner.scan.dir.concurrent.size
|
||||
| hbase.coprocessor.master.classes
|
||||
| hbase.coprocessor.region.classes
|
||||
| hbase.coprocessor.regionserver.classes
|
||||
| hbase.coprocessor.user.region.classes
|
||||
| hbase.regionserver.thread.compaction.large
|
||||
| hbase.regionserver.thread.compaction.small
|
||||
| hbase.regionserver.thread.split
|
||||
|
|
Loading…
Reference in New Issue