HBASE-17543 - Create additional ReplicationEndpoint WALEntryFilters by configuration
Signed-off-by: tedyu <yuzhihong@gmail.com>
This commit is contained in:
parent
c937e97864
commit
3aac1b6884
|
@ -36,12 +36,7 @@ import org.apache.commons.lang.StringUtils;
|
|||
import org.apache.commons.logging.Log;
|
||||
import org.apache.commons.logging.LogFactory;
|
||||
import org.apache.hadoop.conf.Configuration;
|
||||
import org.apache.hadoop.hbase.Abortable;
|
||||
import org.apache.hadoop.hbase.HColumnDescriptor;
|
||||
import org.apache.hadoop.hbase.HConstants;
|
||||
import org.apache.hadoop.hbase.HTableDescriptor;
|
||||
import org.apache.hadoop.hbase.TableName;
|
||||
import org.apache.hadoop.hbase.TableNotFoundException;
|
||||
import org.apache.hadoop.hbase.*;
|
||||
import org.apache.hadoop.hbase.classification.InterfaceAudience;
|
||||
import org.apache.hadoop.hbase.classification.InterfaceStability;
|
||||
import org.apache.hadoop.hbase.client.Admin;
|
||||
|
@ -89,7 +84,8 @@ public class ReplicationAdmin implements Closeable {
|
|||
|
||||
public static final String TNAME = "tableName";
|
||||
public static final String CFNAME = "columnFamlyName";
|
||||
|
||||
public static final String REPLICATION_WALENTRYFILTER_CONFIG_KEY
|
||||
= "hbase.replication.source.custom.walentryfilters";
|
||||
// only Global for now, can add other type
|
||||
// such as, 1) no global replication, or 2) the table is replicated to this cluster, etc.
|
||||
public static final String REPLICATIONTYPE = "replicationType";
|
||||
|
@ -203,6 +199,7 @@ public class ReplicationAdmin implements Closeable {
|
|||
if (tableCfs != null) {
|
||||
peerConfig.setTableCFsMap(tableCfs);
|
||||
}
|
||||
checkConfiguredWALEntryFilters(peerConfig);
|
||||
this.replicationPeers.addPeer(id, peerConfig);
|
||||
}
|
||||
|
||||
|
@ -212,11 +209,13 @@ public class ReplicationAdmin implements Closeable {
|
|||
* @param peerConfig configuration for the replication slave cluster
|
||||
*/
|
||||
public void addPeer(String id, ReplicationPeerConfig peerConfig) throws ReplicationException {
|
||||
checkConfiguredWALEntryFilters(peerConfig);
|
||||
this.replicationPeers.addPeer(id, peerConfig);
|
||||
}
|
||||
|
||||
public void updatePeerConfig(String id, ReplicationPeerConfig peerConfig)
|
||||
throws ReplicationException {
|
||||
checkConfiguredWALEntryFilters(peerConfig);
|
||||
this.replicationPeers.updatePeerConfig(id, peerConfig);
|
||||
}
|
||||
|
||||
|
@ -692,4 +691,22 @@ public class ReplicationAdmin implements Closeable {
|
|||
}
|
||||
return true;
|
||||
}
|
||||
|
||||
private void checkConfiguredWALEntryFilters(ReplicationPeerConfig peerConfig)
|
||||
throws ReplicationException {
|
||||
String filterCSV = peerConfig.getConfiguration().
|
||||
get(REPLICATION_WALENTRYFILTER_CONFIG_KEY);
|
||||
if (filterCSV != null && !filterCSV.isEmpty()) {
|
||||
String[] filters = filterCSV.split(",");
|
||||
for (String filter : filters) {
|
||||
try {
|
||||
Class clazz = Class.forName(filter);
|
||||
Object o = clazz.newInstance();
|
||||
} catch (Exception e) {
|
||||
throw new ReplicationException("Configured WALEntryFilter " + filter +
|
||||
" could not be created. Failing add/update " + "peer operation.", e);
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
|
@ -28,6 +28,8 @@ import org.apache.hadoop.hbase.HBaseInterfaceAudience;
|
|||
import com.google.common.collect.Lists;
|
||||
import com.google.common.util.concurrent.AbstractService;
|
||||
|
||||
import static org.apache.hadoop.hbase.client.replication.ReplicationAdmin.REPLICATION_WALENTRYFILTER_CONFIG_KEY;
|
||||
|
||||
/**
|
||||
* A Base implementation for {@link ReplicationEndpoint}s. Users should consider extending this
|
||||
* class rather than implementing {@link ReplicationEndpoint} directly for better backwards
|
||||
|
@ -75,6 +77,20 @@ public abstract class BaseReplicationEndpoint extends AbstractService
|
|||
if (tableCfFilter != null) {
|
||||
filters.add(tableCfFilter);
|
||||
}
|
||||
if (ctx != null && ctx.getPeerConfig() != null) {
|
||||
String filterNameCSV = ctx.getPeerConfig().getConfiguration().get(REPLICATION_WALENTRYFILTER_CONFIG_KEY);
|
||||
if (filterNameCSV != null && !filterNameCSV.isEmpty()) {
|
||||
String[] filterNames = filterNameCSV.split(",");
|
||||
for (String filterName : filterNames) {
|
||||
try {
|
||||
Class<?> clazz = Class.forName(filterName);
|
||||
filters.add((WALEntryFilter) clazz.newInstance());
|
||||
} catch (Exception e) {
|
||||
LOG.error("Unable to create WALEntryFilter " + filterName, e);
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
return filters.isEmpty() ? null : new ChainWALEntryFilter(filters);
|
||||
}
|
||||
|
||||
|
|
|
@ -30,6 +30,7 @@ import java.util.concurrent.atomic.AtomicReference;
|
|||
import org.apache.commons.logging.Log;
|
||||
import org.apache.commons.logging.LogFactory;
|
||||
import org.apache.hadoop.hbase.Cell;
|
||||
import org.apache.hadoop.hbase.client.replication.ReplicationAdmin;
|
||||
import org.apache.hadoop.hbase.testclassification.MediumTests;
|
||||
import org.apache.hadoop.hbase.HRegionInfo;
|
||||
import org.apache.hadoop.hbase.Waiter;
|
||||
|
@ -238,9 +239,13 @@ public class TestReplicationEndpoint extends TestReplicationBase {
|
|||
|
||||
@Test (timeout=120000)
|
||||
public void testWALEntryFilterFromReplicationEndpoint() throws Exception {
|
||||
admin.addPeer("testWALEntryFilterFromReplicationEndpoint",
|
||||
new ReplicationPeerConfig().setClusterKey(ZKConfig.getZooKeeperClusterKey(conf1))
|
||||
.setReplicationEndpointImpl(ReplicationEndpointWithWALEntryFilter.class.getName()), null);
|
||||
ReplicationPeerConfig rpc = new ReplicationPeerConfig().setClusterKey(ZKConfig.getZooKeeperClusterKey(conf1))
|
||||
.setReplicationEndpointImpl(ReplicationEndpointWithWALEntryFilter.class.getName());
|
||||
//test that we can create mutliple WALFilters reflectively
|
||||
rpc.getConfiguration().put(ReplicationAdmin.REPLICATION_WALENTRYFILTER_CONFIG_KEY,
|
||||
EverythingPassesWALEntryFilter.class.getName() + ","
|
||||
+ EverythingPassesWALEntryFilterSubclass.class.getName());
|
||||
admin.addPeer("testWALEntryFilterFromReplicationEndpoint", rpc);
|
||||
// now replicate some data.
|
||||
try (Connection connection = ConnectionFactory.createConnection(conf1)) {
|
||||
doPut(connection, Bytes.toBytes("row1"));
|
||||
|
@ -256,9 +261,31 @@ public class TestReplicationEndpoint extends TestReplicationBase {
|
|||
});
|
||||
|
||||
Assert.assertNull(ReplicationEndpointWithWALEntryFilter.ex.get());
|
||||
//make sure our reflectively created filter is in the filter chain
|
||||
Assert.assertTrue(EverythingPassesWALEntryFilter.hasPassedAnEntry());
|
||||
admin.removePeer("testWALEntryFilterFromReplicationEndpoint");
|
||||
}
|
||||
|
||||
@Test (timeout=120000, expected=ReplicationException.class)
|
||||
public void testWALEntryFilterAddValidation() throws Exception {
|
||||
ReplicationPeerConfig rpc = new ReplicationPeerConfig().setClusterKey(ZKConfig.getZooKeeperClusterKey(conf1))
|
||||
.setReplicationEndpointImpl(ReplicationEndpointWithWALEntryFilter.class.getName());
|
||||
//test that we can create mutliple WALFilters reflectively
|
||||
rpc.getConfiguration().put(ReplicationAdmin.REPLICATION_WALENTRYFILTER_CONFIG_KEY,
|
||||
"IAmNotARealWalEntryFilter");
|
||||
admin.addPeer("testWALEntryFilterAddValidation", rpc);
|
||||
}
|
||||
|
||||
@Test (timeout=120000, expected=ReplicationException.class)
|
||||
public void testWALEntryFilterUpdateValidation() throws Exception {
|
||||
ReplicationPeerConfig rpc = new ReplicationPeerConfig().setClusterKey(ZKConfig.getZooKeeperClusterKey(conf1))
|
||||
.setReplicationEndpointImpl(ReplicationEndpointWithWALEntryFilter.class.getName());
|
||||
//test that we can create mutliple WALFilters reflectively
|
||||
rpc.getConfiguration().put(ReplicationAdmin.REPLICATION_WALENTRYFILTER_CONFIG_KEY,
|
||||
"IAmNotARealWalEntryFilter");
|
||||
admin.updatePeerConfig("testWALEntryFilterUpdateValidation", rpc);
|
||||
}
|
||||
|
||||
|
||||
@Test
|
||||
public void testMetricsSourceBaseSourcePassthrough(){
|
||||
|
@ -487,4 +514,21 @@ public class TestReplicationEndpoint extends TestReplicationBase {
|
|||
});
|
||||
}
|
||||
}
|
||||
|
||||
public static class EverythingPassesWALEntryFilter implements WALEntryFilter {
|
||||
private static boolean passedEntry = false;
|
||||
@Override
|
||||
public Entry filter(Entry entry) {
|
||||
passedEntry = true;
|
||||
return entry;
|
||||
}
|
||||
|
||||
public static boolean hasPassedAnEntry(){
|
||||
return passedEntry;
|
||||
}
|
||||
}
|
||||
|
||||
public static class EverythingPassesWALEntryFilterSubclass extends EverythingPassesWALEntryFilter {
|
||||
|
||||
}
|
||||
}
|
||||
|
|
Loading…
Reference in New Issue