HBASE-17543 - Create additional ReplicationEndpoint WALEntryFilters by configuration
Signed-off-by: tedyu <yuzhihong@gmail.com>
This commit is contained in:
parent
ae21797305
commit
5ebaadf1a6
|
@ -27,9 +27,11 @@ import java.util.regex.Pattern;
|
||||||
|
|
||||||
import org.apache.hadoop.conf.Configuration;
|
import org.apache.hadoop.conf.Configuration;
|
||||||
import org.apache.hadoop.hbase.Abortable;
|
import org.apache.hadoop.hbase.Abortable;
|
||||||
|
import org.apache.hadoop.hbase.DoNotRetryIOException;
|
||||||
import org.apache.hadoop.hbase.ReplicationPeerNotFoundException;
|
import org.apache.hadoop.hbase.ReplicationPeerNotFoundException;
|
||||||
import org.apache.hadoop.hbase.TableName;
|
import org.apache.hadoop.hbase.TableName;
|
||||||
import org.apache.hadoop.hbase.classification.InterfaceAudience;
|
import org.apache.hadoop.hbase.classification.InterfaceAudience;
|
||||||
|
import org.apache.hadoop.hbase.replication.BaseReplicationEndpoint;
|
||||||
import org.apache.hadoop.hbase.replication.ReplicationException;
|
import org.apache.hadoop.hbase.replication.ReplicationException;
|
||||||
import org.apache.hadoop.hbase.replication.ReplicationFactory;
|
import org.apache.hadoop.hbase.replication.ReplicationFactory;
|
||||||
import org.apache.hadoop.hbase.replication.ReplicationPeerConfig;
|
import org.apache.hadoop.hbase.replication.ReplicationPeerConfig;
|
||||||
|
@ -68,9 +70,10 @@ public class ReplicationManager {
|
||||||
}
|
}
|
||||||
|
|
||||||
public void addReplicationPeer(String peerId, ReplicationPeerConfig peerConfig)
|
public void addReplicationPeer(String peerId, ReplicationPeerConfig peerConfig)
|
||||||
throws ReplicationException {
|
throws ReplicationException, IOException {
|
||||||
checkNamespacesAndTableCfsConfigConflict(peerConfig.getNamespaces(),
|
checkNamespacesAndTableCfsConfigConflict(peerConfig.getNamespaces(),
|
||||||
peerConfig.getTableCFsMap());
|
peerConfig.getTableCFsMap());
|
||||||
|
checkConfiguredWALEntryFilters(peerConfig);
|
||||||
replicationPeers.registerPeer(peerId, peerConfig);
|
replicationPeers.registerPeer(peerId, peerConfig);
|
||||||
replicationPeers.peerConnected(peerId);
|
replicationPeers.peerConnected(peerId);
|
||||||
}
|
}
|
||||||
|
@ -98,9 +101,10 @@ public class ReplicationManager {
|
||||||
}
|
}
|
||||||
|
|
||||||
public void updatePeerConfig(String peerId, ReplicationPeerConfig peerConfig)
|
public void updatePeerConfig(String peerId, ReplicationPeerConfig peerConfig)
|
||||||
throws ReplicationException {
|
throws ReplicationException, IOException {
|
||||||
checkNamespacesAndTableCfsConfigConflict(peerConfig.getNamespaces(),
|
checkNamespacesAndTableCfsConfigConflict(peerConfig.getNamespaces(),
|
||||||
peerConfig.getTableCFsMap());
|
peerConfig.getTableCFsMap());
|
||||||
|
checkConfiguredWALEntryFilters(peerConfig);
|
||||||
this.replicationPeers.updatePeerConfig(peerId, peerConfig);
|
this.replicationPeers.updatePeerConfig(peerId, peerConfig);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -146,5 +150,25 @@ public class ReplicationManager {
|
||||||
"Table-cfs config conflict with namespaces config in peer");
|
"Table-cfs config conflict with namespaces config in peer");
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
|
}
|
||||||
|
|
||||||
|
private void checkConfiguredWALEntryFilters(ReplicationPeerConfig peerConfig)
|
||||||
|
throws IOException {
|
||||||
|
String filterCSV = peerConfig.getConfiguration().
|
||||||
|
get(BaseReplicationEndpoint.REPLICATION_WALENTRYFILTER_CONFIG_KEY);
|
||||||
|
if (filterCSV != null && !filterCSV.isEmpty()){
|
||||||
|
String [] filters = filterCSV.split(",");
|
||||||
|
for (String filter : filters) {
|
||||||
|
try {
|
||||||
|
Class clazz = Class.forName(filter);
|
||||||
|
Object o = clazz.newInstance();
|
||||||
|
} catch (Exception e) {
|
||||||
|
throw new DoNotRetryIOException("Configured WALEntryFilter " + filter +
|
||||||
|
" could not be created. Failing add/update " + "peer operation.", e);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
|
@ -39,6 +39,8 @@ public abstract class BaseReplicationEndpoint extends AbstractService
|
||||||
implements ReplicationEndpoint {
|
implements ReplicationEndpoint {
|
||||||
|
|
||||||
private static final Log LOG = LogFactory.getLog(BaseReplicationEndpoint.class);
|
private static final Log LOG = LogFactory.getLog(BaseReplicationEndpoint.class);
|
||||||
|
public static final String REPLICATION_WALENTRYFILTER_CONFIG_KEY
|
||||||
|
= "hbase.replication.source.custom.walentryfilters";
|
||||||
protected Context ctx;
|
protected Context ctx;
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
|
@ -76,6 +78,20 @@ public abstract class BaseReplicationEndpoint extends AbstractService
|
||||||
if (tableCfFilter != null) {
|
if (tableCfFilter != null) {
|
||||||
filters.add(tableCfFilter);
|
filters.add(tableCfFilter);
|
||||||
}
|
}
|
||||||
|
if (ctx != null && ctx.getPeerConfig() != null) {
|
||||||
|
String filterNameCSV = ctx.getPeerConfig().getConfiguration().get(REPLICATION_WALENTRYFILTER_CONFIG_KEY);
|
||||||
|
if (filterNameCSV != null && !filterNameCSV.isEmpty()) {
|
||||||
|
String[] filterNames = filterNameCSV.split(",");
|
||||||
|
for (String filterName : filterNames) {
|
||||||
|
try {
|
||||||
|
Class<?> clazz = Class.forName(filterName);
|
||||||
|
filters.add((WALEntryFilter) clazz.newInstance());
|
||||||
|
} catch (Exception e) {
|
||||||
|
LOG.error("Unable to create WALEntryFilter " + filterName, e);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
return filters.isEmpty() ? null : new ChainWALEntryFilter(filters);
|
return filters.isEmpty() ? null : new ChainWALEntryFilter(filters);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -225,6 +225,7 @@ public class TestReplicationEndpoint extends TestReplicationBase {
|
||||||
public boolean evaluate() throws Exception {
|
public boolean evaluate() throws Exception {
|
||||||
return InterClusterReplicationEndpointForTest.replicateCount.get() == numEdits;
|
return InterClusterReplicationEndpointForTest.replicateCount.get() == numEdits;
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public String explainFailure() throws Exception {
|
public String explainFailure() throws Exception {
|
||||||
String failure = "Failed to replicate all edits, expected = " + numEdits
|
String failure = "Failed to replicate all edits, expected = " + numEdits
|
||||||
|
@ -239,9 +240,13 @@ public class TestReplicationEndpoint extends TestReplicationBase {
|
||||||
|
|
||||||
@Test (timeout=120000)
|
@Test (timeout=120000)
|
||||||
public void testWALEntryFilterFromReplicationEndpoint() throws Exception {
|
public void testWALEntryFilterFromReplicationEndpoint() throws Exception {
|
||||||
admin.addPeer("testWALEntryFilterFromReplicationEndpoint",
|
ReplicationPeerConfig rpc = new ReplicationPeerConfig().setClusterKey(ZKConfig.getZooKeeperClusterKey(conf1))
|
||||||
new ReplicationPeerConfig().setClusterKey(ZKConfig.getZooKeeperClusterKey(conf1))
|
.setReplicationEndpointImpl(ReplicationEndpointWithWALEntryFilter.class.getName());
|
||||||
.setReplicationEndpointImpl(ReplicationEndpointWithWALEntryFilter.class.getName()), null);
|
//test that we can create mutliple WALFilters reflectively
|
||||||
|
rpc.getConfiguration().put(BaseReplicationEndpoint.REPLICATION_WALENTRYFILTER_CONFIG_KEY,
|
||||||
|
EverythingPassesWALEntryFilter.class.getName() +
|
||||||
|
"," + EverythingPassesWALEntryFilterSubclass.class.getName());
|
||||||
|
admin.addPeer("testWALEntryFilterFromReplicationEndpoint", rpc);
|
||||||
// now replicate some data.
|
// now replicate some data.
|
||||||
try (Connection connection = ConnectionFactory.createConnection(conf1)) {
|
try (Connection connection = ConnectionFactory.createConnection(conf1)) {
|
||||||
doPut(connection, Bytes.toBytes("row1"));
|
doPut(connection, Bytes.toBytes("row1"));
|
||||||
|
@ -257,9 +262,31 @@ public class TestReplicationEndpoint extends TestReplicationBase {
|
||||||
});
|
});
|
||||||
|
|
||||||
Assert.assertNull(ReplicationEndpointWithWALEntryFilter.ex.get());
|
Assert.assertNull(ReplicationEndpointWithWALEntryFilter.ex.get());
|
||||||
|
//make sure our reflectively created filter is in the filter chain
|
||||||
|
Assert.assertTrue(EverythingPassesWALEntryFilter.hasPassedAnEntry());
|
||||||
admin.removePeer("testWALEntryFilterFromReplicationEndpoint");
|
admin.removePeer("testWALEntryFilterFromReplicationEndpoint");
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Test (timeout=120000, expected=IOException.class)
|
||||||
|
public void testWALEntryFilterAddValidation() throws Exception {
|
||||||
|
ReplicationPeerConfig rpc = new ReplicationPeerConfig().setClusterKey(ZKConfig.getZooKeeperClusterKey(conf1))
|
||||||
|
.setReplicationEndpointImpl(ReplicationEndpointWithWALEntryFilter.class.getName());
|
||||||
|
//test that we can create mutliple WALFilters reflectively
|
||||||
|
rpc.getConfiguration().put(BaseReplicationEndpoint.REPLICATION_WALENTRYFILTER_CONFIG_KEY,
|
||||||
|
"IAmNotARealWalEntryFilter");
|
||||||
|
admin.addPeer("testWALEntryFilterAddValidation", rpc);
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test (timeout=120000, expected=IOException.class)
|
||||||
|
public void testWALEntryFilterUpdateValidation() throws Exception {
|
||||||
|
ReplicationPeerConfig rpc = new ReplicationPeerConfig().setClusterKey(ZKConfig.getZooKeeperClusterKey(conf1))
|
||||||
|
.setReplicationEndpointImpl(ReplicationEndpointWithWALEntryFilter.class.getName());
|
||||||
|
//test that we can create mutliple WALFilters reflectively
|
||||||
|
rpc.getConfiguration().put(BaseReplicationEndpoint.REPLICATION_WALENTRYFILTER_CONFIG_KEY,
|
||||||
|
"IAmNotARealWalEntryFilter");
|
||||||
|
admin.updatePeerConfig("testWALEntryFilterUpdateValidation", rpc);
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
@Test
|
@Test
|
||||||
public void testMetricsSourceBaseSourcePassthrough(){
|
public void testMetricsSourceBaseSourcePassthrough(){
|
||||||
|
@ -488,4 +515,21 @@ public class TestReplicationEndpoint extends TestReplicationBase {
|
||||||
});
|
});
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
public static class EverythingPassesWALEntryFilter implements WALEntryFilter {
|
||||||
|
private static boolean passedEntry = false;
|
||||||
|
@Override
|
||||||
|
public Entry filter(Entry entry) {
|
||||||
|
passedEntry = true;
|
||||||
|
return entry;
|
||||||
|
}
|
||||||
|
|
||||||
|
public static boolean hasPassedAnEntry(){
|
||||||
|
return passedEntry;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
public static class EverythingPassesWALEntryFilterSubclass extends EverythingPassesWALEntryFilter {
|
||||||
|
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
Loading…
Reference in New Issue