diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/replication/ReplicationAdmin.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/replication/ReplicationAdmin.java index 76573f4c387..d462f38b171 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/replication/ReplicationAdmin.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/replication/ReplicationAdmin.java @@ -36,12 +36,7 @@ import org.apache.commons.lang.StringUtils; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.hbase.Abortable; -import org.apache.hadoop.hbase.HColumnDescriptor; -import org.apache.hadoop.hbase.HConstants; -import org.apache.hadoop.hbase.HTableDescriptor; -import org.apache.hadoop.hbase.TableName; -import org.apache.hadoop.hbase.TableNotFoundException; +import org.apache.hadoop.hbase.*; import org.apache.hadoop.hbase.classification.InterfaceAudience; import org.apache.hadoop.hbase.classification.InterfaceStability; import org.apache.hadoop.hbase.client.Admin; @@ -89,7 +84,8 @@ public class ReplicationAdmin implements Closeable { public static final String TNAME = "tableName"; public static final String CFNAME = "columnFamlyName"; - + public static final String REPLICATION_WALENTRYFILTER_CONFIG_KEY + = "hbase.replication.source.custom.walentryfilters"; // only Global for now, can add other type // such as, 1) no global replication, or 2) the table is replicated to this cluster, etc. public static final String REPLICATIONTYPE = "replicationType"; @@ -203,6 +199,7 @@ public class ReplicationAdmin implements Closeable { if (tableCfs != null) { peerConfig.setTableCFsMap(tableCfs); } + checkConfiguredWALEntryFilters(peerConfig); this.replicationPeers.addPeer(id, peerConfig); } @@ -212,11 +209,13 @@ public class ReplicationAdmin implements Closeable { * @param peerConfig configuration for the replication slave cluster */ public void addPeer(String id, ReplicationPeerConfig peerConfig) throws ReplicationException { + checkConfiguredWALEntryFilters(peerConfig); this.replicationPeers.addPeer(id, peerConfig); } public void updatePeerConfig(String id, ReplicationPeerConfig peerConfig) throws ReplicationException { + checkConfiguredWALEntryFilters(peerConfig); this.replicationPeers.updatePeerConfig(id, peerConfig); } @@ -692,4 +691,22 @@ public class ReplicationAdmin implements Closeable { } return true; } + + private void checkConfiguredWALEntryFilters(ReplicationPeerConfig peerConfig) + throws ReplicationException { + String filterCSV = peerConfig.getConfiguration(). + get(REPLICATION_WALENTRYFILTER_CONFIG_KEY); + if (filterCSV != null && !filterCSV.isEmpty()) { + String[] filters = filterCSV.split(","); + for (String filter : filters) { + try { + Class clazz = Class.forName(filter); + Object o = clazz.newInstance(); + } catch (Exception e) { + throw new ReplicationException("Configured WALEntryFilter " + filter + + " could not be created. Failing add/update " + "peer operation.", e); + } + } + } + } } diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/BaseReplicationEndpoint.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/BaseReplicationEndpoint.java index bc73f74d47a..71a222a8a49 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/BaseReplicationEndpoint.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/BaseReplicationEndpoint.java @@ -28,6 +28,8 @@ import org.apache.hadoop.hbase.HBaseInterfaceAudience; import com.google.common.collect.Lists; import com.google.common.util.concurrent.AbstractService; +import static org.apache.hadoop.hbase.client.replication.ReplicationAdmin.REPLICATION_WALENTRYFILTER_CONFIG_KEY; + /** * A Base implementation for {@link ReplicationEndpoint}s. Users should consider extending this * class rather than implementing {@link ReplicationEndpoint} directly for better backwards @@ -75,6 +77,20 @@ public abstract class BaseReplicationEndpoint extends AbstractService if (tableCfFilter != null) { filters.add(tableCfFilter); } + if (ctx != null && ctx.getPeerConfig() != null) { + String filterNameCSV = ctx.getPeerConfig().getConfiguration().get(REPLICATION_WALENTRYFILTER_CONFIG_KEY); + if (filterNameCSV != null && !filterNameCSV.isEmpty()) { + String[] filterNames = filterNameCSV.split(","); + for (String filterName : filterNames) { + try { + Class clazz = Class.forName(filterName); + filters.add((WALEntryFilter) clazz.newInstance()); + } catch (Exception e) { + LOG.error("Unable to create WALEntryFilter " + filterName, e); + } + } + } + } return filters.isEmpty() ? null : new ChainWALEntryFilter(filters); } diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationEndpoint.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationEndpoint.java index b9d5582a4ae..7128adcf7f7 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationEndpoint.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationEndpoint.java @@ -30,6 +30,7 @@ import java.util.concurrent.atomic.AtomicReference; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.apache.hadoop.hbase.Cell; +import org.apache.hadoop.hbase.client.replication.ReplicationAdmin; import org.apache.hadoop.hbase.testclassification.MediumTests; import org.apache.hadoop.hbase.HRegionInfo; import org.apache.hadoop.hbase.Waiter; @@ -238,9 +239,13 @@ public class TestReplicationEndpoint extends TestReplicationBase { @Test (timeout=120000) public void testWALEntryFilterFromReplicationEndpoint() throws Exception { - admin.addPeer("testWALEntryFilterFromReplicationEndpoint", - new ReplicationPeerConfig().setClusterKey(ZKConfig.getZooKeeperClusterKey(conf1)) - .setReplicationEndpointImpl(ReplicationEndpointWithWALEntryFilter.class.getName()), null); + ReplicationPeerConfig rpc = new ReplicationPeerConfig().setClusterKey(ZKConfig.getZooKeeperClusterKey(conf1)) + .setReplicationEndpointImpl(ReplicationEndpointWithWALEntryFilter.class.getName()); + //test that we can create mutliple WALFilters reflectively + rpc.getConfiguration().put(ReplicationAdmin.REPLICATION_WALENTRYFILTER_CONFIG_KEY, + EverythingPassesWALEntryFilter.class.getName() + "," + + EverythingPassesWALEntryFilterSubclass.class.getName()); + admin.addPeer("testWALEntryFilterFromReplicationEndpoint", rpc); // now replicate some data. try (Connection connection = ConnectionFactory.createConnection(conf1)) { doPut(connection, Bytes.toBytes("row1")); @@ -256,9 +261,31 @@ public class TestReplicationEndpoint extends TestReplicationBase { }); Assert.assertNull(ReplicationEndpointWithWALEntryFilter.ex.get()); + //make sure our reflectively created filter is in the filter chain + Assert.assertTrue(EverythingPassesWALEntryFilter.hasPassedAnEntry()); admin.removePeer("testWALEntryFilterFromReplicationEndpoint"); } + @Test (timeout=120000, expected=ReplicationException.class) + public void testWALEntryFilterAddValidation() throws Exception { + ReplicationPeerConfig rpc = new ReplicationPeerConfig().setClusterKey(ZKConfig.getZooKeeperClusterKey(conf1)) + .setReplicationEndpointImpl(ReplicationEndpointWithWALEntryFilter.class.getName()); + //test that we can create mutliple WALFilters reflectively + rpc.getConfiguration().put(ReplicationAdmin.REPLICATION_WALENTRYFILTER_CONFIG_KEY, + "IAmNotARealWalEntryFilter"); + admin.addPeer("testWALEntryFilterAddValidation", rpc); + } + + @Test (timeout=120000, expected=ReplicationException.class) + public void testWALEntryFilterUpdateValidation() throws Exception { + ReplicationPeerConfig rpc = new ReplicationPeerConfig().setClusterKey(ZKConfig.getZooKeeperClusterKey(conf1)) + .setReplicationEndpointImpl(ReplicationEndpointWithWALEntryFilter.class.getName()); + //test that we can create mutliple WALFilters reflectively + rpc.getConfiguration().put(ReplicationAdmin.REPLICATION_WALENTRYFILTER_CONFIG_KEY, + "IAmNotARealWalEntryFilter"); + admin.updatePeerConfig("testWALEntryFilterUpdateValidation", rpc); + } + @Test public void testMetricsSourceBaseSourcePassthrough(){ @@ -487,4 +514,21 @@ public class TestReplicationEndpoint extends TestReplicationBase { }); } } + + public static class EverythingPassesWALEntryFilter implements WALEntryFilter { + private static boolean passedEntry = false; + @Override + public Entry filter(Entry entry) { + passedEntry = true; + return entry; + } + + public static boolean hasPassedAnEntry(){ + return passedEntry; + } + } + + public static class EverythingPassesWALEntryFilterSubclass extends EverythingPassesWALEntryFilter { + + } }