diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java index 85785e23b87..b9a977f437d 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java @@ -120,6 +120,7 @@ import org.apache.hadoop.hbase.client.TableDescriptor; import org.apache.hadoop.hbase.client.TableDescriptorBuilder; import org.apache.hadoop.hbase.conf.ConfigurationManager; import org.apache.hadoop.hbase.conf.PropagatingConfigurationObserver; +import org.apache.hadoop.hbase.coprocessor.CoprocessorHost; import org.apache.hadoop.hbase.coprocessor.RegionObserver.MutationType; import org.apache.hadoop.hbase.errorhandling.ForeignExceptionSnare; import org.apache.hadoop.hbase.exceptions.FailedSanityCheckException; @@ -150,6 +151,8 @@ import org.apache.hadoop.hbase.regionserver.throttle.NoLimitThroughputController import org.apache.hadoop.hbase.regionserver.throttle.StoreHotnessProtector; import org.apache.hadoop.hbase.regionserver.throttle.ThroughputController; import org.apache.hadoop.hbase.regionserver.wal.WALUtil; +import org.apache.hadoop.hbase.replication.ReplicationUtils; +import org.apache.hadoop.hbase.replication.regionserver.ReplicationObserver; import org.apache.hadoop.hbase.security.User; import org.apache.hadoop.hbase.snapshot.SnapshotDescriptionUtils; import org.apache.hadoop.hbase.snapshot.SnapshotManifest; @@ -813,6 +816,7 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi this.htableDescriptor.getDurability() == Durability.USE_DEFAULT ? defaultDurability : this.htableDescriptor.getDurability(); + decorateRegionConfiguration(conf); if (rsServices != null) { this.rsAccounting = this.rsServices.getRegionServerAccounting(); // don't initialize coprocessors if not running within a regionserver @@ -8668,4 +8672,19 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi requestFlush0(tracker); } + /** + * This method modifies the region's configuration in order to inject replication-related + * features + * @param conf region configurations + */ + static void decorateRegionConfiguration(Configuration conf) { + if (ReplicationUtils.isReplicationForBulkLoadDataEnabled(conf)) { + String plugins = conf.get(CoprocessorHost.REGION_COPROCESSOR_CONF_KEY,""); + String replicationCoprocessorClass = ReplicationObserver.class.getCanonicalName(); + if (!plugins.contains(replicationCoprocessorClass)) { + conf.set(CoprocessorHost.REGION_COPROCESSOR_CONF_KEY, + (plugins.equals("") ? "" : (plugins + ",")) + replicationCoprocessorClass); + } + } + } } diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java index aec94d41552..02815c532af 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java @@ -131,9 +131,7 @@ import org.apache.hadoop.hbase.regionserver.handler.RSProcedureHandler; import org.apache.hadoop.hbase.regionserver.handler.RegionReplicaFlushHandler; import org.apache.hadoop.hbase.regionserver.throttle.FlushThroughputControllerFactory; import org.apache.hadoop.hbase.regionserver.throttle.ThroughputController; -import org.apache.hadoop.hbase.replication.ReplicationUtils; import org.apache.hadoop.hbase.replication.regionserver.ReplicationLoad; -import org.apache.hadoop.hbase.replication.regionserver.ReplicationObserver; import org.apache.hadoop.hbase.replication.regionserver.ReplicationSourceInterface; import org.apache.hadoop.hbase.replication.regionserver.ReplicationStatus; import org.apache.hadoop.hbase.security.Superusers; @@ -545,7 +543,6 @@ public class HRegionServer extends HasThread implements checkCodecs(this.conf); this.userProvider = UserProvider.instantiate(conf); FSUtils.setupShortCircuitRead(this.conf); - decorateRegionServerConfiguration(this.conf); // Disable usage of meta replicas in the regionserver this.conf.setBoolean(HConstants.USE_META_REPLICAS, false); @@ -3784,22 +3781,6 @@ public class HRegionServer extends HasThread implements } } - /** - * This method modifies the region server's configuration in order to inject replication-related - * features - * @param conf region server configurations - */ - static void decorateRegionServerConfiguration(Configuration conf) { - if (ReplicationUtils.isReplicationForBulkLoadDataEnabled(conf)) { - String plugins = conf.get(CoprocessorHost.REGIONSERVER_COPROCESSOR_CONF_KEY, ""); - String rsCoprocessorClass = ReplicationObserver.class.getCanonicalName(); - if (!plugins.contains(rsCoprocessorClass)) { - conf.set(CoprocessorHost.REGIONSERVER_COPROCESSOR_CONF_KEY, - plugins + "," + rsCoprocessorClass); - } - } - } - public boolean isShutDown() { return shutDown; } diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestHRegion.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestHRegion.java index 342e8208a47..2078212a738 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestHRegion.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestHRegion.java @@ -114,6 +114,7 @@ import org.apache.hadoop.hbase.client.RowMutations; import org.apache.hadoop.hbase.client.Scan; import org.apache.hadoop.hbase.client.Table; import org.apache.hadoop.hbase.client.TableDescriptor; +import org.apache.hadoop.hbase.coprocessor.CoprocessorHost; import org.apache.hadoop.hbase.exceptions.FailedSanityCheckException; import org.apache.hadoop.hbase.filter.BigDecimalComparator; import org.apache.hadoop.hbase.filter.BinaryComparator; @@ -140,6 +141,7 @@ import org.apache.hadoop.hbase.regionserver.compactions.CompactionRequestImpl; import org.apache.hadoop.hbase.regionserver.wal.FSHLog; import org.apache.hadoop.hbase.regionserver.wal.MetricsWALSource; import org.apache.hadoop.hbase.regionserver.wal.WALUtil; +import org.apache.hadoop.hbase.replication.regionserver.ReplicationObserver; import org.apache.hadoop.hbase.security.User; import org.apache.hadoop.hbase.test.MetricsAssertHelper; import org.apache.hadoop.hbase.testclassification.LargeTests; @@ -6209,6 +6211,29 @@ public class TestHRegion { Assert.assertEquals(wrcBeforeMutate + rm.getMutations().size(), wrcAfterMutate); } + @Test + public void testBulkLoadReplicationEnabled() throws IOException { + TEST_UTIL.getConfiguration().setBoolean(HConstants.REPLICATION_BULKLOAD_ENABLE_KEY, true); + final ServerName serverName = ServerName.valueOf(name.getMethodName(), 100, 42); + final RegionServerServices rss = spy(TEST_UTIL.createMockRegionServerService(serverName)); + + HTableDescriptor htd = new HTableDescriptor(TableName.valueOf(name.getMethodName())); + htd.addFamily(new HColumnDescriptor(fam1)); + HRegionInfo hri = new HRegionInfo(htd.getTableName(), + HConstants.EMPTY_BYTE_ARRAY, HConstants.EMPTY_BYTE_ARRAY); + region = HRegion.openHRegion(hri, htd, rss.getWAL(hri), TEST_UTIL.getConfiguration(), + rss, null); + + assertTrue(region.conf.getBoolean(HConstants.REPLICATION_BULKLOAD_ENABLE_KEY, false)); + String plugins = region.conf.get(CoprocessorHost.REGION_COPROCESSOR_CONF_KEY, ""); + String replicationCoprocessorClass = ReplicationObserver.class.getCanonicalName(); + assertTrue(plugins.contains(replicationCoprocessorClass)); + assertTrue(region.getCoprocessorHost(). + getCoprocessors().contains(ReplicationObserver.class.getSimpleName())); + + region.close(); + } + /** * The same as HRegion class, the only difference is that instantiateHStore will * create a different HStore - HStoreForTesting. [HBASE-8518]