HBASE-13063 Allow to turn off memstore replication for region replicas
This commit is contained in:
parent
52ff5485a8
commit
eb2193afa0
|
@ -192,6 +192,14 @@ public class HTableDescriptor implements WritableComparable<HTableDescriptor> {
|
|||
private static final ImmutableBytesWritable REGION_REPLICATION_KEY =
|
||||
new ImmutableBytesWritable(Bytes.toBytes(REGION_REPLICATION));
|
||||
|
||||
/**
|
||||
* <em>INTERNAL</em> flag to indicate whether or not the memstore should be replicated
|
||||
* for read-replicas (CONSISTENCY => TIMELINE).
|
||||
*/
|
||||
public static final String REGION_MEMSTORE_REPLICATION = "REGION_MEMSTORE_REPLICATION";
|
||||
private static final ImmutableBytesWritable REGION_MEMSTORE_REPLICATION_KEY =
|
||||
new ImmutableBytesWritable(Bytes.toBytes(REGION_MEMSTORE_REPLICATION));
|
||||
|
||||
/** Default durability for HTD is USE_DEFAULT, which defaults to HBase-global default value */
|
||||
private static final Durability DEFAULT_DURABLITY = Durability.USE_DEFAULT;
|
||||
|
||||
|
@ -226,6 +234,8 @@ public class HTableDescriptor implements WritableComparable<HTableDescriptor> {
|
|||
|
||||
public static final int DEFAULT_REGION_REPLICATION = 1;
|
||||
|
||||
public static final boolean DEFAULT_REGION_MEMSTORE_REPLICATION = true;
|
||||
|
||||
private final static Map<String, String> DEFAULT_VALUES
|
||||
= new HashMap<String, String>();
|
||||
private final static Set<ImmutableBytesWritable> RESERVED_KEYWORDS
|
||||
|
@ -1167,6 +1177,27 @@ public class HTableDescriptor implements WritableComparable<HTableDescriptor> {
|
|||
return this;
|
||||
}
|
||||
|
||||
/**
|
||||
* @return true if the read-replicas memstore replication is enabled.
|
||||
*/
|
||||
public boolean hasRegionMemstoreReplication() {
|
||||
return isSomething(REGION_MEMSTORE_REPLICATION_KEY, DEFAULT_REGION_MEMSTORE_REPLICATION);
|
||||
}
|
||||
|
||||
/**
|
||||
* Enable or Disable the memstore replication from the primary region to the replicas.
|
||||
* The replication will be used only for meta operations (e.g. flush, compaction, ...)
|
||||
*
|
||||
* @param memstoreReplication true if the new data written to the primary region
|
||||
* should be replicated.
|
||||
* false if the secondaries can tollerate to have new
|
||||
* data only when the primary flushes the memstore.
|
||||
*/
|
||||
public HTableDescriptor setRegionMemstoreReplication(boolean memstoreReplication) {
|
||||
setValue(REGION_MEMSTORE_REPLICATION_KEY, memstoreReplication ? TRUE : FALSE);
|
||||
return this;
|
||||
}
|
||||
|
||||
/**
|
||||
* Returns all the column family names of the current table. The map of
|
||||
* HTableDescriptor contains mapping of family name to HColumnDescriptors.
|
||||
|
|
|
@ -3172,6 +3172,13 @@ public class HRegionServer extends HasThread implements
|
|||
return configurationManager;
|
||||
}
|
||||
|
||||
/**
|
||||
* @return Return table descriptors implementation.
|
||||
*/
|
||||
public TableDescriptors getTableDescriptors() {
|
||||
return this.tableDescriptors;
|
||||
}
|
||||
|
||||
/**
|
||||
* Reload the configuration from disk.
|
||||
*/
|
||||
|
|
|
@ -126,6 +126,15 @@ public class WALEdit implements Writable, HeapSize {
|
|||
return CellUtil.matchingFamily(cell, METAFAMILY);
|
||||
}
|
||||
|
||||
public boolean isMetaEdit() {
|
||||
for (Cell cell: cells) {
|
||||
if (!isMetaEditFamily(cell)) {
|
||||
return false;
|
||||
}
|
||||
}
|
||||
return true;
|
||||
}
|
||||
|
||||
/**
|
||||
* @return True when current WALEdit is created by log replay. Replication skips WALEdits from
|
||||
* replay.
|
||||
|
@ -345,7 +354,7 @@ public class WALEdit implements Writable, HeapSize {
|
|||
bulkLoadDescriptor.toByteArray());
|
||||
return new WALEdit().add(kv);
|
||||
}
|
||||
|
||||
|
||||
/**
|
||||
* Deserialized and returns a BulkLoadDescriptor from the passed in Cell
|
||||
* @param cell the key value
|
||||
|
@ -357,4 +366,4 @@ public class WALEdit implements Writable, HeapSize {
|
|||
}
|
||||
return null;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
|
@ -26,6 +26,7 @@ import org.apache.hadoop.hbase.classification.InterfaceAudience;
|
|||
import org.apache.hadoop.conf.Configuration;
|
||||
import org.apache.hadoop.fs.FileSystem;
|
||||
import org.apache.hadoop.hbase.HBaseInterfaceAudience;
|
||||
import org.apache.hadoop.hbase.TableDescriptors;
|
||||
import org.apache.hadoop.hbase.wal.WAL.Entry;
|
||||
import org.apache.hadoop.hbase.replication.regionserver.MetricsSource;
|
||||
|
||||
|
@ -51,6 +52,7 @@ public interface ReplicationEndpoint extends Service {
|
|||
class Context {
|
||||
private final Configuration conf;
|
||||
private final FileSystem fs;
|
||||
private final TableDescriptors tableDescriptors;
|
||||
private final ReplicationPeerConfig peerConfig;
|
||||
private final ReplicationPeer replicationPeer;
|
||||
private final String peerId;
|
||||
|
@ -65,7 +67,8 @@ public interface ReplicationEndpoint extends Service {
|
|||
final String peerId,
|
||||
final UUID clusterId,
|
||||
final ReplicationPeer replicationPeer,
|
||||
final MetricsSource metrics) {
|
||||
final MetricsSource metrics,
|
||||
final TableDescriptors tableDescriptors) {
|
||||
this.peerConfig = peerConfig;
|
||||
this.conf = conf;
|
||||
this.fs = fs;
|
||||
|
@ -73,6 +76,7 @@ public interface ReplicationEndpoint extends Service {
|
|||
this.peerId = peerId;
|
||||
this.replicationPeer = replicationPeer;
|
||||
this.metrics = metrics;
|
||||
this.tableDescriptors = tableDescriptors;
|
||||
}
|
||||
public Configuration getConfiguration() {
|
||||
return conf;
|
||||
|
@ -95,6 +99,9 @@ public interface ReplicationEndpoint extends Service {
|
|||
public MetricsSource getMetrics() {
|
||||
return metrics;
|
||||
}
|
||||
public TableDescriptors getTableDescriptors() {
|
||||
return tableDescriptors;
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
|
|
|
@ -43,9 +43,11 @@ import org.apache.hadoop.hbase.HBaseIOException;
|
|||
import org.apache.hadoop.hbase.HConstants;
|
||||
import org.apache.hadoop.hbase.HRegionInfo;
|
||||
import org.apache.hadoop.hbase.HRegionLocation;
|
||||
import org.apache.hadoop.hbase.HTableDescriptor;
|
||||
import org.apache.hadoop.hbase.RegionLocations;
|
||||
import org.apache.hadoop.hbase.TableName;
|
||||
import org.apache.hadoop.hbase.TableNotFoundException;
|
||||
import org.apache.hadoop.hbase.TableDescriptors;
|
||||
import org.apache.hadoop.hbase.client.ConnectionFactory;
|
||||
import org.apache.hadoop.hbase.client.RegionAdminServiceCallable;
|
||||
import org.apache.hadoop.hbase.client.ClusterConnection;
|
||||
|
@ -96,6 +98,7 @@ public class RegionReplicaReplicationEndpoint extends HBaseReplicationEndpoint {
|
|||
|
||||
private Configuration conf;
|
||||
private ClusterConnection connection;
|
||||
private TableDescriptors tableDescriptors;
|
||||
|
||||
// Reuse WALSplitter constructs as a WAL pipe
|
||||
private PipelineController controller;
|
||||
|
@ -152,6 +155,7 @@ public class RegionReplicaReplicationEndpoint extends HBaseReplicationEndpoint {
|
|||
super.init(context);
|
||||
|
||||
this.conf = HBaseConfiguration.create(context.getConfiguration());
|
||||
this.tableDescriptors = context.getTableDescriptors();
|
||||
|
||||
// HRS multiplies client retries by 10 globally for meta operations, but we do not want this.
|
||||
// We are resetting it here because we want default number of retries (35) rather than 10 times
|
||||
|
@ -184,8 +188,8 @@ public class RegionReplicaReplicationEndpoint extends HBaseReplicationEndpoint {
|
|||
try {
|
||||
connection = (ClusterConnection) ConnectionFactory.createConnection(this.conf);
|
||||
this.pool = getDefaultThreadPool(conf);
|
||||
outputSink = new RegionReplicaOutputSink(controller, entryBuffers, connection, pool,
|
||||
numWriterThreads, operationTimeout);
|
||||
outputSink = new RegionReplicaOutputSink(controller, tableDescriptors, entryBuffers,
|
||||
connection, pool, numWriterThreads, operationTimeout);
|
||||
outputSink.startWriterThreads();
|
||||
super.doStart();
|
||||
} catch (IOException ex) {
|
||||
|
@ -313,12 +317,28 @@ public class RegionReplicaReplicationEndpoint extends HBaseReplicationEndpoint {
|
|||
}
|
||||
|
||||
static class RegionReplicaOutputSink extends OutputSink {
|
||||
private RegionReplicaSinkWriter sinkWriter;
|
||||
private final RegionReplicaSinkWriter sinkWriter;
|
||||
private final TableDescriptors tableDescriptors;
|
||||
private final Cache<TableName, Boolean> memstoreReplicationEnabled;
|
||||
|
||||
public RegionReplicaOutputSink(PipelineController controller, EntryBuffers entryBuffers,
|
||||
ClusterConnection connection, ExecutorService pool, int numWriters, int operationTimeout) {
|
||||
public RegionReplicaOutputSink(PipelineController controller, TableDescriptors tableDescriptors,
|
||||
EntryBuffers entryBuffers, ClusterConnection connection, ExecutorService pool,
|
||||
int numWriters, int operationTimeout) {
|
||||
super(controller, entryBuffers, numWriters);
|
||||
this.sinkWriter = new RegionReplicaSinkWriter(this, connection, pool, operationTimeout);
|
||||
this.tableDescriptors = tableDescriptors;
|
||||
|
||||
// A cache for the table "memstore replication enabled" flag.
|
||||
// It has a default expiry of 5 sec. This means that if the table is altered
|
||||
// with a different flag value, we might miss to replicate for that amount of
|
||||
// time. But this cache avoid the slow lookup and parsing of the TableDescriptor.
|
||||
int memstoreReplicationEnabledCacheExpiryMs = connection.getConfiguration()
|
||||
.getInt("hbase.region.replica.replication.cache.memstoreReplicationEnabled.expiryMs", 5000);
|
||||
this.memstoreReplicationEnabled = CacheBuilder.newBuilder()
|
||||
.expireAfterWrite(memstoreReplicationEnabledCacheExpiryMs, TimeUnit.MILLISECONDS)
|
||||
.initialCapacity(10)
|
||||
.maximumSize(1000)
|
||||
.build();
|
||||
}
|
||||
|
||||
@Override
|
||||
|
@ -329,6 +349,12 @@ public class RegionReplicaReplicationEndpoint extends HBaseReplicationEndpoint {
|
|||
return;
|
||||
}
|
||||
|
||||
// meta edits (e.g. flush) are always replicated.
|
||||
// data edits (e.g. put) are replicated if the table requires them.
|
||||
if (!requiresReplication(buffer.getTableName(), entries)) {
|
||||
return;
|
||||
}
|
||||
|
||||
sinkWriter.append(buffer.getTableName(), buffer.getEncodedRegionName(),
|
||||
entries.get(0).getEdit().getCells().get(0).getRow(), entries);
|
||||
}
|
||||
|
@ -360,6 +386,44 @@ public class RegionReplicaReplicationEndpoint extends HBaseReplicationEndpoint {
|
|||
AtomicLong getSkippedEditsCounter() {
|
||||
return skippedEdits;
|
||||
}
|
||||
|
||||
/**
|
||||
* returns true if the specified entry must be replicated.
|
||||
* We should always replicate meta operations (e.g. flush)
|
||||
* and use the user HTD flag to decide whether or not replicate the memstore.
|
||||
*/
|
||||
private boolean requiresReplication(final TableName tableName, final List<Entry> entries)
|
||||
throws IOException {
|
||||
// unit-tests may not the TableDescriptors, bypass the check and always replicate
|
||||
if (tableDescriptors == null) return true;
|
||||
|
||||
Boolean requiresReplication = memstoreReplicationEnabled.getIfPresent(tableName);
|
||||
if (requiresReplication == null) {
|
||||
// check if the table requires memstore replication
|
||||
// some unit-test drop the table, so we should do a bypass check and always replicate.
|
||||
HTableDescriptor htd = tableDescriptors.get(tableName);
|
||||
requiresReplication = htd == null || htd.hasRegionMemstoreReplication();
|
||||
memstoreReplicationEnabled.put(tableName, requiresReplication);
|
||||
}
|
||||
|
||||
// if memstore replication is not required, check the entries.
|
||||
// meta edits (e.g. flush) must be always replicated.
|
||||
if (!requiresReplication) {
|
||||
int skipEdits = 0;
|
||||
java.util.Iterator<Entry> it = entries.iterator();
|
||||
while (it.hasNext()) {
|
||||
Entry entry = it.next();
|
||||
if (entry.getEdit().isMetaEdit()) {
|
||||
requiresReplication = true;
|
||||
} else {
|
||||
it.remove();
|
||||
skipEdits++;
|
||||
}
|
||||
}
|
||||
skippedEdits.addAndGet(skipEdits);
|
||||
}
|
||||
return requiresReplication;
|
||||
}
|
||||
}
|
||||
|
||||
static class RegionReplicaSinkWriter extends SinkWriter {
|
||||
|
|
|
@ -43,6 +43,7 @@ import org.apache.hadoop.conf.Configuration;
|
|||
import org.apache.hadoop.fs.FileSystem;
|
||||
import org.apache.hadoop.fs.Path;
|
||||
import org.apache.hadoop.hbase.Server;
|
||||
import org.apache.hadoop.hbase.TableDescriptors;
|
||||
import org.apache.hadoop.hbase.classification.InterfaceAudience;
|
||||
import org.apache.hadoop.hbase.regionserver.HRegionServer;
|
||||
import org.apache.hadoop.hbase.regionserver.RegionServerCoprocessorHost;
|
||||
|
@ -200,7 +201,7 @@ public class ReplicationSourceManager implements ReplicationListener {
|
|||
}
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
private void cleanOldLogs(SortedSet<String> wals, String key, String id) {
|
||||
SortedSet<String> walSet = wals.headSet(key);
|
||||
LOG.debug("Removing " + walSet.size() + " logs in the list: " + walSet);
|
||||
|
@ -302,7 +303,7 @@ public class ReplicationSourceManager implements ReplicationListener {
|
|||
protected Map<String, SortedSet<String>> getWALs() {
|
||||
return Collections.unmodifiableMap(walsById);
|
||||
}
|
||||
|
||||
|
||||
/**
|
||||
* Get a copy of the wals of the recovered sources on this rs
|
||||
* @return a sorted set of wal names
|
||||
|
@ -375,8 +376,10 @@ public class ReplicationSourceManager implements ReplicationListener {
|
|||
final ReplicationPeerConfig peerConfig, final ReplicationPeer replicationPeer)
|
||||
throws IOException {
|
||||
RegionServerCoprocessorHost rsServerHost = null;
|
||||
TableDescriptors tableDescriptors = null;
|
||||
if (server instanceof HRegionServer) {
|
||||
rsServerHost = ((HRegionServer) server).getRegionServerCoprocessorHost();
|
||||
tableDescriptors = ((HRegionServer) server).getTableDescriptors();
|
||||
}
|
||||
ReplicationSourceInterface src;
|
||||
try {
|
||||
|
@ -420,7 +423,7 @@ public class ReplicationSourceManager implements ReplicationListener {
|
|||
|
||||
// init replication endpoint
|
||||
replicationEndpoint.init(new ReplicationEndpoint.Context(replicationPeer.getConfiguration(),
|
||||
fs, peerConfig, peerId, clusterId, replicationPeer, metrics));
|
||||
fs, peerConfig, peerId, clusterId, replicationPeer, metrics, tableDescriptors));
|
||||
|
||||
return src;
|
||||
}
|
||||
|
|
|
@ -2144,10 +2144,20 @@ public class HBaseTestingUtility extends HBaseCommonTestingUtility {
|
|||
|
||||
public void verifyNumericRows(HRegion region, final byte[] f, int startRow, int endRow)
|
||||
throws IOException {
|
||||
verifyNumericRows(region, f, startRow, endRow, true);
|
||||
}
|
||||
|
||||
public void verifyNumericRows(HRegion region, final byte[] f, int startRow, int endRow,
|
||||
final boolean present) throws IOException {
|
||||
for (int i = startRow; i < endRow; i++) {
|
||||
String failMsg = "Failed verification of row :" + i;
|
||||
byte[] data = Bytes.toBytes(String.valueOf(i));
|
||||
Result result = region.get(new Get(data));
|
||||
|
||||
boolean hasResult = result != null && !result.isEmpty();
|
||||
assertEquals(failMsg + result, present, hasResult);
|
||||
if (!present) continue;
|
||||
|
||||
assertTrue(failMsg, result.containsColumn(f, null));
|
||||
assertEquals(failMsg, result.getColumnCells(f, null).size(), 1);
|
||||
Cell cell = result.getColumnLatestCell(f, null);
|
||||
|
|
|
@ -172,6 +172,11 @@ public class TestRegionReplicaReplicationEndpoint {
|
|||
|
||||
private void verifyReplication(TableName tableName, int regionReplication,
|
||||
final int startRow, final int endRow) throws Exception {
|
||||
verifyReplication(tableName, regionReplication, startRow, endRow, true);
|
||||
}
|
||||
|
||||
private void verifyReplication(TableName tableName, int regionReplication,
|
||||
final int startRow, final int endRow, final boolean present) throws Exception {
|
||||
// find the regions
|
||||
final HRegion[] regions = new HRegion[regionReplication];
|
||||
|
||||
|
@ -195,7 +200,7 @@ public class TestRegionReplicaReplicationEndpoint {
|
|||
public boolean evaluate() throws Exception {
|
||||
LOG.info("verifying replication for region replica:" + region.getRegionInfo());
|
||||
try {
|
||||
HTU.verifyNumericRows(region, HBaseTestingUtility.fam1, startRow, endRow);
|
||||
HTU.verifyNumericRows(region, HBaseTestingUtility.fam1, startRow, endRow, present);
|
||||
} catch(Throwable ex) {
|
||||
LOG.warn("Verification from secondary region is not complete yet. Got:" + ex
|
||||
+ " " + ex.getMessage());
|
||||
|
@ -223,6 +228,38 @@ public class TestRegionReplicaReplicationEndpoint {
|
|||
testRegionReplicaReplication(10);
|
||||
}
|
||||
|
||||
@Test (timeout = 240000)
|
||||
public void testRegionReplicaWithoutMemstoreReplication() throws Exception {
|
||||
int regionReplication = 3;
|
||||
TableName tableName = TableName.valueOf("testRegionReplicaWithoutMemstoreReplication");
|
||||
HTableDescriptor htd = HTU.createTableDescriptor(tableName.toString());
|
||||
htd.setRegionReplication(regionReplication);
|
||||
htd.setRegionMemstoreReplication(false);
|
||||
HTU.getHBaseAdmin().createTable(htd);
|
||||
|
||||
Connection connection = ConnectionFactory.createConnection(HTU.getConfiguration());
|
||||
Table table = connection.getTable(tableName);
|
||||
try {
|
||||
// write data to the primary. The replicas should not receive the data
|
||||
final int STEP = 100;
|
||||
for (int i = 0; i < 3; ++i) {
|
||||
final int startRow = i * STEP;
|
||||
final int endRow = (i + 1) * STEP;
|
||||
LOG.info("Writing data from " + startRow + " to " + endRow);
|
||||
HTU.loadNumericRows(table, HBaseTestingUtility.fam1, startRow, endRow);
|
||||
verifyReplication(tableName, regionReplication, startRow, endRow, false);
|
||||
|
||||
// Flush the table, now the data should show up in the replicas
|
||||
LOG.info("flushing table");
|
||||
HTU.flush(tableName);
|
||||
verifyReplication(tableName, regionReplication, 0, endRow, true);
|
||||
}
|
||||
} finally {
|
||||
table.close();
|
||||
connection.close();
|
||||
}
|
||||
}
|
||||
|
||||
@Test (timeout = 240000)
|
||||
public void testRegionReplicaReplicationForFlushAndCompaction() throws Exception {
|
||||
// Tests a table with region replication 3. Writes some data, and causes flushes and
|
||||
|
|
Loading…
Reference in New Issue