diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/replication/ReplicationManager.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/replication/ReplicationManager.java index 139a38015dc..c86f33cedb2 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/replication/ReplicationManager.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/replication/ReplicationManager.java @@ -27,9 +27,11 @@ import java.util.regex.Pattern; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.hbase.Abortable; +import org.apache.hadoop.hbase.DoNotRetryIOException; import org.apache.hadoop.hbase.ReplicationPeerNotFoundException; import org.apache.hadoop.hbase.TableName; import org.apache.hadoop.hbase.classification.InterfaceAudience; +import org.apache.hadoop.hbase.replication.BaseReplicationEndpoint; import org.apache.hadoop.hbase.replication.ReplicationException; import org.apache.hadoop.hbase.replication.ReplicationFactory; import org.apache.hadoop.hbase.replication.ReplicationPeerConfig; @@ -68,9 +70,10 @@ public class ReplicationManager { } public void addReplicationPeer(String peerId, ReplicationPeerConfig peerConfig) - throws ReplicationException { + throws ReplicationException, IOException { checkNamespacesAndTableCfsConfigConflict(peerConfig.getNamespaces(), peerConfig.getTableCFsMap()); + checkConfiguredWALEntryFilters(peerConfig); replicationPeers.registerPeer(peerId, peerConfig); replicationPeers.peerConnected(peerId); } @@ -98,9 +101,10 @@ public class ReplicationManager { } public void updatePeerConfig(String peerId, ReplicationPeerConfig peerConfig) - throws ReplicationException { + throws ReplicationException, IOException { checkNamespacesAndTableCfsConfigConflict(peerConfig.getNamespaces(), peerConfig.getTableCFsMap()); + checkConfiguredWALEntryFilters(peerConfig); this.replicationPeers.updatePeerConfig(peerId, peerConfig); } @@ -146,5 +150,25 @@ public class ReplicationManager { "Table-cfs config conflict with namespaces config in peer"); } } + + + } + + private void checkConfiguredWALEntryFilters(ReplicationPeerConfig peerConfig) + throws IOException { + String filterCSV = peerConfig.getConfiguration(). + get(BaseReplicationEndpoint.REPLICATION_WALENTRYFILTER_CONFIG_KEY); + if (filterCSV != null && !filterCSV.isEmpty()){ + String [] filters = filterCSV.split(","); + for (String filter : filters) { + try { + Class clazz = Class.forName(filter); + Object o = clazz.newInstance(); + } catch (Exception e) { + throw new DoNotRetryIOException("Configured WALEntryFilter " + filter + + " could not be created. Failing add/update " + "peer operation.", e); + } + } + } } } \ No newline at end of file diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/BaseReplicationEndpoint.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/BaseReplicationEndpoint.java index 48f3ac5f11a..cf141c140a6 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/BaseReplicationEndpoint.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/BaseReplicationEndpoint.java @@ -39,6 +39,8 @@ public abstract class BaseReplicationEndpoint extends AbstractService implements ReplicationEndpoint { private static final Log LOG = LogFactory.getLog(BaseReplicationEndpoint.class); + public static final String REPLICATION_WALENTRYFILTER_CONFIG_KEY + = "hbase.replication.source.custom.walentryfilters"; protected Context ctx; @Override @@ -76,6 +78,20 @@ public abstract class BaseReplicationEndpoint extends AbstractService if (tableCfFilter != null) { filters.add(tableCfFilter); } + if (ctx != null && ctx.getPeerConfig() != null) { + String filterNameCSV = ctx.getPeerConfig().getConfiguration().get(REPLICATION_WALENTRYFILTER_CONFIG_KEY); + if (filterNameCSV != null && !filterNameCSV.isEmpty()) { + String[] filterNames = filterNameCSV.split(","); + for (String filterName : filterNames) { + try { + Class clazz = Class.forName(filterName); + filters.add((WALEntryFilter) clazz.newInstance()); + } catch (Exception e) { + LOG.error("Unable to create WALEntryFilter " + filterName, e); + } + } + } + } return filters.isEmpty() ? null : new ChainWALEntryFilter(filters); } diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationEndpoint.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationEndpoint.java index f4f5f245fdb..5e8d569ab12 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationEndpoint.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationEndpoint.java @@ -225,6 +225,7 @@ public class TestReplicationEndpoint extends TestReplicationBase { public boolean evaluate() throws Exception { return InterClusterReplicationEndpointForTest.replicateCount.get() == numEdits; } + @Override public String explainFailure() throws Exception { String failure = "Failed to replicate all edits, expected = " + numEdits @@ -239,9 +240,13 @@ public class TestReplicationEndpoint extends TestReplicationBase { @Test (timeout=120000) public void testWALEntryFilterFromReplicationEndpoint() throws Exception { - admin.addPeer("testWALEntryFilterFromReplicationEndpoint", - new ReplicationPeerConfig().setClusterKey(ZKConfig.getZooKeeperClusterKey(conf1)) - .setReplicationEndpointImpl(ReplicationEndpointWithWALEntryFilter.class.getName()), null); + ReplicationPeerConfig rpc = new ReplicationPeerConfig().setClusterKey(ZKConfig.getZooKeeperClusterKey(conf1)) + .setReplicationEndpointImpl(ReplicationEndpointWithWALEntryFilter.class.getName()); + //test that we can create mutliple WALFilters reflectively + rpc.getConfiguration().put(BaseReplicationEndpoint.REPLICATION_WALENTRYFILTER_CONFIG_KEY, + EverythingPassesWALEntryFilter.class.getName() + + "," + EverythingPassesWALEntryFilterSubclass.class.getName()); + admin.addPeer("testWALEntryFilterFromReplicationEndpoint", rpc); // now replicate some data. try (Connection connection = ConnectionFactory.createConnection(conf1)) { doPut(connection, Bytes.toBytes("row1")); @@ -257,9 +262,31 @@ public class TestReplicationEndpoint extends TestReplicationBase { }); Assert.assertNull(ReplicationEndpointWithWALEntryFilter.ex.get()); + //make sure our reflectively created filter is in the filter chain + Assert.assertTrue(EverythingPassesWALEntryFilter.hasPassedAnEntry()); admin.removePeer("testWALEntryFilterFromReplicationEndpoint"); } + @Test (timeout=120000, expected=IOException.class) + public void testWALEntryFilterAddValidation() throws Exception { + ReplicationPeerConfig rpc = new ReplicationPeerConfig().setClusterKey(ZKConfig.getZooKeeperClusterKey(conf1)) + .setReplicationEndpointImpl(ReplicationEndpointWithWALEntryFilter.class.getName()); + //test that we can create mutliple WALFilters reflectively + rpc.getConfiguration().put(BaseReplicationEndpoint.REPLICATION_WALENTRYFILTER_CONFIG_KEY, + "IAmNotARealWalEntryFilter"); + admin.addPeer("testWALEntryFilterAddValidation", rpc); + } + + @Test (timeout=120000, expected=IOException.class) + public void testWALEntryFilterUpdateValidation() throws Exception { + ReplicationPeerConfig rpc = new ReplicationPeerConfig().setClusterKey(ZKConfig.getZooKeeperClusterKey(conf1)) + .setReplicationEndpointImpl(ReplicationEndpointWithWALEntryFilter.class.getName()); + //test that we can create mutliple WALFilters reflectively + rpc.getConfiguration().put(BaseReplicationEndpoint.REPLICATION_WALENTRYFILTER_CONFIG_KEY, + "IAmNotARealWalEntryFilter"); + admin.updatePeerConfig("testWALEntryFilterUpdateValidation", rpc); + } + @Test public void testMetricsSourceBaseSourcePassthrough(){ @@ -488,4 +515,21 @@ public class TestReplicationEndpoint extends TestReplicationBase { }); } } + + public static class EverythingPassesWALEntryFilter implements WALEntryFilter { + private static boolean passedEntry = false; + @Override + public Entry filter(Entry entry) { + passedEntry = true; + return entry; + } + + public static boolean hasPassedAnEntry(){ + return passedEntry; + } + } + + public static class EverythingPassesWALEntryFilterSubclass extends EverythingPassesWALEntryFilter { + + } }