HBASE-21001 ReplicationObserver fails to load in HBase 2.0.0

This commit is contained in:
Guangxu Cheng 2018-09-07 23:42:02 +08:00
parent 24f2893059
commit c3419be003
3 changed files with 44 additions and 19 deletions

View File

@ -120,6 +120,7 @@ import org.apache.hadoop.hbase.client.TableDescriptor;
import org.apache.hadoop.hbase.client.TableDescriptorBuilder;
import org.apache.hadoop.hbase.conf.ConfigurationManager;
import org.apache.hadoop.hbase.conf.PropagatingConfigurationObserver;
import org.apache.hadoop.hbase.coprocessor.CoprocessorHost;
import org.apache.hadoop.hbase.coprocessor.RegionObserver.MutationType;
import org.apache.hadoop.hbase.errorhandling.ForeignExceptionSnare;
import org.apache.hadoop.hbase.exceptions.FailedSanityCheckException;
@ -150,6 +151,8 @@ import org.apache.hadoop.hbase.regionserver.throttle.NoLimitThroughputController
import org.apache.hadoop.hbase.regionserver.throttle.StoreHotnessProtector;
import org.apache.hadoop.hbase.regionserver.throttle.ThroughputController;
import org.apache.hadoop.hbase.regionserver.wal.WALUtil;
import org.apache.hadoop.hbase.replication.ReplicationUtils;
import org.apache.hadoop.hbase.replication.regionserver.ReplicationObserver;
import org.apache.hadoop.hbase.security.User;
import org.apache.hadoop.hbase.snapshot.SnapshotDescriptionUtils;
import org.apache.hadoop.hbase.snapshot.SnapshotManifest;
@ -813,6 +816,7 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi
this.htableDescriptor.getDurability() == Durability.USE_DEFAULT ? defaultDurability :
this.htableDescriptor.getDurability();
decorateRegionConfiguration(conf);
if (rsServices != null) {
this.rsAccounting = this.rsServices.getRegionServerAccounting();
// don't initialize coprocessors if not running within a regionserver
@ -8668,4 +8672,19 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi
requestFlush0(tracker);
}
/**
* This method modifies the region's configuration in order to inject replication-related
* features
* @param conf region configurations
*/
static void decorateRegionConfiguration(Configuration conf) {
if (ReplicationUtils.isReplicationForBulkLoadDataEnabled(conf)) {
String plugins = conf.get(CoprocessorHost.REGION_COPROCESSOR_CONF_KEY,"");
String replicationCoprocessorClass = ReplicationObserver.class.getCanonicalName();
if (!plugins.contains(replicationCoprocessorClass)) {
conf.set(CoprocessorHost.REGION_COPROCESSOR_CONF_KEY,
(plugins.equals("") ? "" : (plugins + ",")) + replicationCoprocessorClass);
}
}
}
}

View File

@ -131,9 +131,7 @@ import org.apache.hadoop.hbase.regionserver.handler.RSProcedureHandler;
import org.apache.hadoop.hbase.regionserver.handler.RegionReplicaFlushHandler;
import org.apache.hadoop.hbase.regionserver.throttle.FlushThroughputControllerFactory;
import org.apache.hadoop.hbase.regionserver.throttle.ThroughputController;
import org.apache.hadoop.hbase.replication.ReplicationUtils;
import org.apache.hadoop.hbase.replication.regionserver.ReplicationLoad;
import org.apache.hadoop.hbase.replication.regionserver.ReplicationObserver;
import org.apache.hadoop.hbase.replication.regionserver.ReplicationSourceInterface;
import org.apache.hadoop.hbase.replication.regionserver.ReplicationStatus;
import org.apache.hadoop.hbase.security.Superusers;
@ -545,7 +543,6 @@ public class HRegionServer extends HasThread implements
checkCodecs(this.conf);
this.userProvider = UserProvider.instantiate(conf);
FSUtils.setupShortCircuitRead(this.conf);
decorateRegionServerConfiguration(this.conf);
// Disable usage of meta replicas in the regionserver
this.conf.setBoolean(HConstants.USE_META_REPLICAS, false);
@ -3784,22 +3781,6 @@ public class HRegionServer extends HasThread implements
}
}
/**
* This method modifies the region server's configuration in order to inject replication-related
* features
* @param conf region server configurations
*/
static void decorateRegionServerConfiguration(Configuration conf) {
if (ReplicationUtils.isReplicationForBulkLoadDataEnabled(conf)) {
String plugins = conf.get(CoprocessorHost.REGIONSERVER_COPROCESSOR_CONF_KEY, "");
String rsCoprocessorClass = ReplicationObserver.class.getCanonicalName();
if (!plugins.contains(rsCoprocessorClass)) {
conf.set(CoprocessorHost.REGIONSERVER_COPROCESSOR_CONF_KEY,
plugins + "," + rsCoprocessorClass);
}
}
}
public boolean isShutDown() {
return shutDown;
}

View File

@ -114,6 +114,7 @@ import org.apache.hadoop.hbase.client.RowMutations;
import org.apache.hadoop.hbase.client.Scan;
import org.apache.hadoop.hbase.client.Table;
import org.apache.hadoop.hbase.client.TableDescriptor;
import org.apache.hadoop.hbase.coprocessor.CoprocessorHost;
import org.apache.hadoop.hbase.exceptions.FailedSanityCheckException;
import org.apache.hadoop.hbase.filter.BigDecimalComparator;
import org.apache.hadoop.hbase.filter.BinaryComparator;
@ -140,6 +141,7 @@ import org.apache.hadoop.hbase.regionserver.compactions.CompactionRequestImpl;
import org.apache.hadoop.hbase.regionserver.wal.FSHLog;
import org.apache.hadoop.hbase.regionserver.wal.MetricsWALSource;
import org.apache.hadoop.hbase.regionserver.wal.WALUtil;
import org.apache.hadoop.hbase.replication.regionserver.ReplicationObserver;
import org.apache.hadoop.hbase.security.User;
import org.apache.hadoop.hbase.test.MetricsAssertHelper;
import org.apache.hadoop.hbase.testclassification.LargeTests;
@ -6209,6 +6211,29 @@ public class TestHRegion {
Assert.assertEquals(wrcBeforeMutate + rm.getMutations().size(), wrcAfterMutate);
}
@Test
public void testBulkLoadReplicationEnabled() throws IOException {
TEST_UTIL.getConfiguration().setBoolean(HConstants.REPLICATION_BULKLOAD_ENABLE_KEY, true);
final ServerName serverName = ServerName.valueOf(name.getMethodName(), 100, 42);
final RegionServerServices rss = spy(TEST_UTIL.createMockRegionServerService(serverName));
HTableDescriptor htd = new HTableDescriptor(TableName.valueOf(name.getMethodName()));
htd.addFamily(new HColumnDescriptor(fam1));
HRegionInfo hri = new HRegionInfo(htd.getTableName(),
HConstants.EMPTY_BYTE_ARRAY, HConstants.EMPTY_BYTE_ARRAY);
region = HRegion.openHRegion(hri, htd, rss.getWAL(hri), TEST_UTIL.getConfiguration(),
rss, null);
assertTrue(region.conf.getBoolean(HConstants.REPLICATION_BULKLOAD_ENABLE_KEY, false));
String plugins = region.conf.get(CoprocessorHost.REGION_COPROCESSOR_CONF_KEY, "");
String replicationCoprocessorClass = ReplicationObserver.class.getCanonicalName();
assertTrue(plugins.contains(replicationCoprocessorClass));
assertTrue(region.getCoprocessorHost().
getCoprocessors().contains(ReplicationObserver.class.getSimpleName()));
region.close();
}
/**
* The same as HRegion class, the only difference is that instantiateHStore will
* create a different HStore - HStoreForTesting. [HBASE-8518]