HBASE-13063 Allow to turn off memstore replication for region replicas

This commit is contained in:
Matteo Bertozzi 2015-03-10 10:02:04 +00:00
parent eeb8c846d9
commit cbf03787ea
8 changed files with 180 additions and 12 deletions

View File

@ -176,6 +176,14 @@ public class HTableDescriptor implements Comparable<HTableDescriptor> {
private static final Bytes REGION_REPLICATION_KEY =
new Bytes(Bytes.toBytes(REGION_REPLICATION));
/**
* <em>INTERNAL</em> flag to indicate whether or not the memstore should be replicated
* for read-replicas (CONSISTENCY => TIMELINE).
*/
public static final String REGION_MEMSTORE_REPLICATION = "REGION_MEMSTORE_REPLICATION";
private static final Bytes REGION_MEMSTORE_REPLICATION_KEY =
new Bytes(Bytes.toBytes(REGION_MEMSTORE_REPLICATION));
/** Default durability for HTD is USE_DEFAULT, which defaults to HBase-global default value */
private static final Durability DEFAULT_DURABLITY = Durability.USE_DEFAULT;
@ -210,6 +218,8 @@ public class HTableDescriptor implements Comparable<HTableDescriptor> {
public static final int DEFAULT_REGION_REPLICATION = 1;
public static final boolean DEFAULT_REGION_MEMSTORE_REPLICATION = true;
private final static Map<String, String> DEFAULT_VALUES
= new HashMap<String, String>();
private final static Set<Bytes> RESERVED_KEYWORDS
@ -1073,6 +1083,27 @@ public class HTableDescriptor implements Comparable<HTableDescriptor> {
return this;
}
/**
* @return true if the read-replicas memstore replication is enabled.
*/
public boolean hasRegionMemstoreReplication() {
return isSomething(REGION_MEMSTORE_REPLICATION_KEY, DEFAULT_REGION_MEMSTORE_REPLICATION);
}
/**
* Enable or Disable the memstore replication from the primary region to the replicas.
* The replication will be used only for meta operations (e.g. flush, compaction, ...)
*
* @param memstoreReplication true if the new data written to the primary region
* should be replicated.
* false if the secondaries can tollerate to have new
* data only when the primary flushes the memstore.
*/
public HTableDescriptor setRegionMemstoreReplication(boolean memstoreReplication) {
setValue(REGION_MEMSTORE_REPLICATION_KEY, memstoreReplication ? TRUE : FALSE);
return this;
}
/**
* Returns all the column family names of the current table. The map of
* HTableDescriptor contains mapping of family name to HColumnDescriptors.

View File

@ -3216,6 +3216,13 @@ public class HRegionServer extends HasThread implements
return configurationManager;
}
/**
* @return Return table descriptors implementation.
*/
public TableDescriptors getTableDescriptors() {
return this.tableDescriptors;
}
/**
* Reload the configuration from disk.
*/

View File

@ -126,6 +126,15 @@ public class WALEdit implements Writable, HeapSize {
return CellUtil.matchingFamily(cell, METAFAMILY);
}
public boolean isMetaEdit() {
for (Cell cell: cells) {
if (!isMetaEditFamily(cell)) {
return false;
}
}
return true;
}
/**
* @return True when current WALEdit is created by log replay. Replication skips WALEdits from
* replay.
@ -345,7 +354,7 @@ public class WALEdit implements Writable, HeapSize {
bulkLoadDescriptor.toByteArray());
return new WALEdit().add(kv);
}
/**
* Deserialized and returns a BulkLoadDescriptor from the passed in Cell
* @param cell the key value
@ -357,4 +366,4 @@ public class WALEdit implements Writable, HeapSize {
}
return null;
}
}
}

View File

@ -26,6 +26,7 @@ import org.apache.hadoop.hbase.classification.InterfaceAudience;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.hbase.HBaseInterfaceAudience;
import org.apache.hadoop.hbase.TableDescriptors;
import org.apache.hadoop.hbase.wal.WAL.Entry;
import org.apache.hadoop.hbase.replication.regionserver.MetricsSource;
@ -51,6 +52,7 @@ public interface ReplicationEndpoint extends Service {
class Context {
private final Configuration conf;
private final FileSystem fs;
private final TableDescriptors tableDescriptors;
private final ReplicationPeerConfig peerConfig;
private final ReplicationPeer replicationPeer;
private final String peerId;
@ -65,7 +67,8 @@ public interface ReplicationEndpoint extends Service {
final String peerId,
final UUID clusterId,
final ReplicationPeer replicationPeer,
final MetricsSource metrics) {
final MetricsSource metrics,
final TableDescriptors tableDescriptors) {
this.peerConfig = peerConfig;
this.conf = conf;
this.fs = fs;
@ -73,6 +76,7 @@ public interface ReplicationEndpoint extends Service {
this.peerId = peerId;
this.replicationPeer = replicationPeer;
this.metrics = metrics;
this.tableDescriptors = tableDescriptors;
}
public Configuration getConfiguration() {
return conf;
@ -95,6 +99,9 @@ public interface ReplicationEndpoint extends Service {
public MetricsSource getMetrics() {
return metrics;
}
public TableDescriptors getTableDescriptors() {
return tableDescriptors;
}
}
/**

View File

@ -43,9 +43,11 @@ import org.apache.hadoop.hbase.HBaseIOException;
import org.apache.hadoop.hbase.HConstants;
import org.apache.hadoop.hbase.HRegionInfo;
import org.apache.hadoop.hbase.HRegionLocation;
import org.apache.hadoop.hbase.HTableDescriptor;
import org.apache.hadoop.hbase.RegionLocations;
import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.TableNotFoundException;
import org.apache.hadoop.hbase.TableDescriptors;
import org.apache.hadoop.hbase.client.ConnectionFactory;
import org.apache.hadoop.hbase.client.RegionAdminServiceCallable;
import org.apache.hadoop.hbase.client.ClusterConnection;
@ -94,6 +96,7 @@ public class RegionReplicaReplicationEndpoint extends HBaseReplicationEndpoint {
private Configuration conf;
private ClusterConnection connection;
private TableDescriptors tableDescriptors;
// Reuse WALSplitter constructs as a WAL pipe
private PipelineController controller;
@ -150,6 +153,7 @@ public class RegionReplicaReplicationEndpoint extends HBaseReplicationEndpoint {
super.init(context);
this.conf = HBaseConfiguration.create(context.getConfiguration());
this.tableDescriptors = context.getTableDescriptors();
// HRS multiplies client retries by 10 globally for meta operations, but we do not want this.
// We are resetting it here because we want default number of retries (35) rather than 10 times
@ -182,8 +186,8 @@ public class RegionReplicaReplicationEndpoint extends HBaseReplicationEndpoint {
try {
connection = (ClusterConnection) ConnectionFactory.createConnection(this.conf);
this.pool = getDefaultThreadPool(conf);
outputSink = new RegionReplicaOutputSink(controller, entryBuffers, connection, pool,
numWriterThreads, operationTimeout);
outputSink = new RegionReplicaOutputSink(controller, tableDescriptors, entryBuffers,
connection, pool, numWriterThreads, operationTimeout);
outputSink.startWriterThreads();
super.doStart();
} catch (IOException ex) {
@ -311,12 +315,28 @@ public class RegionReplicaReplicationEndpoint extends HBaseReplicationEndpoint {
}
static class RegionReplicaOutputSink extends OutputSink {
private RegionReplicaSinkWriter sinkWriter;
private final RegionReplicaSinkWriter sinkWriter;
private final TableDescriptors tableDescriptors;
private final Cache<TableName, Boolean> memstoreReplicationEnabled;
public RegionReplicaOutputSink(PipelineController controller, EntryBuffers entryBuffers,
ClusterConnection connection, ExecutorService pool, int numWriters, int operationTimeout) {
public RegionReplicaOutputSink(PipelineController controller, TableDescriptors tableDescriptors,
EntryBuffers entryBuffers, ClusterConnection connection, ExecutorService pool,
int numWriters, int operationTimeout) {
super(controller, entryBuffers, numWriters);
this.sinkWriter = new RegionReplicaSinkWriter(this, connection, pool, operationTimeout);
this.tableDescriptors = tableDescriptors;
// A cache for the table "memstore replication enabled" flag.
// It has a default expiry of 5 sec. This means that if the table is altered
// with a different flag value, we might miss to replicate for that amount of
// time. But this cache avoid the slow lookup and parsing of the TableDescriptor.
int memstoreReplicationEnabledCacheExpiryMs = connection.getConfiguration()
.getInt("hbase.region.replica.replication.cache.memstoreReplicationEnabled.expiryMs", 5000);
this.memstoreReplicationEnabled = CacheBuilder.newBuilder()
.expireAfterWrite(memstoreReplicationEnabledCacheExpiryMs, TimeUnit.MILLISECONDS)
.initialCapacity(10)
.maximumSize(1000)
.build();
}
@Override
@ -327,6 +347,12 @@ public class RegionReplicaReplicationEndpoint extends HBaseReplicationEndpoint {
return;
}
// meta edits (e.g. flush) are always replicated.
// data edits (e.g. put) are replicated if the table requires them.
if (!requiresReplication(buffer.getTableName(), entries)) {
return;
}
sinkWriter.append(buffer.getTableName(), buffer.getEncodedRegionName(),
entries.get(0).getEdit().getCells().get(0).getRow(), entries);
}
@ -358,6 +384,44 @@ public class RegionReplicaReplicationEndpoint extends HBaseReplicationEndpoint {
AtomicLong getSkippedEditsCounter() {
return skippedEdits;
}
/**
* returns true if the specified entry must be replicated.
* We should always replicate meta operations (e.g. flush)
* and use the user HTD flag to decide whether or not replicate the memstore.
*/
private boolean requiresReplication(final TableName tableName, final List<Entry> entries)
throws IOException {
// unit-tests may not the TableDescriptors, bypass the check and always replicate
if (tableDescriptors == null) return true;
Boolean requiresReplication = memstoreReplicationEnabled.getIfPresent(tableName);
if (requiresReplication == null) {
// check if the table requires memstore replication
// some unit-test drop the table, so we should do a bypass check and always replicate.
HTableDescriptor htd = tableDescriptors.get(tableName);
requiresReplication = htd == null || htd.hasRegionMemstoreReplication();
memstoreReplicationEnabled.put(tableName, requiresReplication);
}
// if memstore replication is not required, check the entries.
// meta edits (e.g. flush) must be always replicated.
if (!requiresReplication) {
int skipEdits = 0;
java.util.Iterator<Entry> it = entries.iterator();
while (it.hasNext()) {
Entry entry = it.next();
if (entry.getEdit().isMetaEdit()) {
requiresReplication = true;
} else {
it.remove();
skipEdits++;
}
}
skippedEdits.addAndGet(skipEdits);
}
return requiresReplication;
}
}
static class RegionReplicaSinkWriter extends SinkWriter {

View File

@ -43,6 +43,7 @@ import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hbase.Server;
import org.apache.hadoop.hbase.TableDescriptors;
import org.apache.hadoop.hbase.classification.InterfaceAudience;
import org.apache.hadoop.hbase.regionserver.HRegionServer;
import org.apache.hadoop.hbase.regionserver.RegionServerCoprocessorHost;
@ -200,7 +201,7 @@ public class ReplicationSourceManager implements ReplicationListener {
}
}
}
private void cleanOldLogs(SortedSet<String> wals, String key, String id) {
SortedSet<String> walSet = wals.headSet(key);
LOG.debug("Removing " + walSet.size() + " logs in the list: " + walSet);
@ -302,7 +303,7 @@ public class ReplicationSourceManager implements ReplicationListener {
protected Map<String, SortedSet<String>> getWALs() {
return Collections.unmodifiableMap(walsById);
}
/**
* Get a copy of the wals of the recovered sources on this rs
* @return a sorted set of wal names
@ -375,8 +376,10 @@ public class ReplicationSourceManager implements ReplicationListener {
final ReplicationPeerConfig peerConfig, final ReplicationPeer replicationPeer)
throws IOException {
RegionServerCoprocessorHost rsServerHost = null;
TableDescriptors tableDescriptors = null;
if (server instanceof HRegionServer) {
rsServerHost = ((HRegionServer) server).getRegionServerCoprocessorHost();
tableDescriptors = ((HRegionServer) server).getTableDescriptors();
}
ReplicationSourceInterface src;
try {
@ -420,7 +423,7 @@ public class ReplicationSourceManager implements ReplicationListener {
// init replication endpoint
replicationEndpoint.init(new ReplicationEndpoint.Context(replicationPeer.getConfiguration(),
fs, peerConfig, peerId, clusterId, replicationPeer, metrics));
fs, peerConfig, peerId, clusterId, replicationPeer, metrics, tableDescriptors));
return src;
}

View File

@ -2203,10 +2203,20 @@ public class HBaseTestingUtility extends HBaseCommonTestingUtility {
public void verifyNumericRows(HRegion region, final byte[] f, int startRow, int endRow)
throws IOException {
verifyNumericRows(region, f, startRow, endRow, true);
}
public void verifyNumericRows(HRegion region, final byte[] f, int startRow, int endRow,
final boolean present) throws IOException {
for (int i = startRow; i < endRow; i++) {
String failMsg = "Failed verification of row :" + i;
byte[] data = Bytes.toBytes(String.valueOf(i));
Result result = region.get(new Get(data));
boolean hasResult = result != null && !result.isEmpty();
assertEquals(failMsg + result, present, hasResult);
if (!present) continue;
assertTrue(failMsg, result.containsColumn(f, null));
assertEquals(failMsg, result.getColumnCells(f, null).size(), 1);
Cell cell = result.getColumnLatestCell(f, null);

View File

@ -172,6 +172,11 @@ public class TestRegionReplicaReplicationEndpoint {
private void verifyReplication(TableName tableName, int regionReplication,
final int startRow, final int endRow) throws Exception {
verifyReplication(tableName, regionReplication, startRow, endRow, true);
}
private void verifyReplication(TableName tableName, int regionReplication,
final int startRow, final int endRow, final boolean present) throws Exception {
// find the regions
final HRegion[] regions = new HRegion[regionReplication];
@ -195,7 +200,7 @@ public class TestRegionReplicaReplicationEndpoint {
public boolean evaluate() throws Exception {
LOG.info("verifying replication for region replica:" + region.getRegionInfo());
try {
HTU.verifyNumericRows(region, HBaseTestingUtility.fam1, startRow, endRow);
HTU.verifyNumericRows(region, HBaseTestingUtility.fam1, startRow, endRow, present);
} catch(Throwable ex) {
LOG.warn("Verification from secondary region is not complete yet. Got:" + ex
+ " " + ex.getMessage());
@ -223,6 +228,38 @@ public class TestRegionReplicaReplicationEndpoint {
testRegionReplicaReplication(10);
}
@Test (timeout = 240000)
public void testRegionReplicaWithoutMemstoreReplication() throws Exception {
int regionReplication = 3;
TableName tableName = TableName.valueOf("testRegionReplicaWithoutMemstoreReplication");
HTableDescriptor htd = HTU.createTableDescriptor(tableName.toString());
htd.setRegionReplication(regionReplication);
htd.setRegionMemstoreReplication(false);
HTU.getHBaseAdmin().createTable(htd);
Connection connection = ConnectionFactory.createConnection(HTU.getConfiguration());
Table table = connection.getTable(tableName);
try {
// write data to the primary. The replicas should not receive the data
final int STEP = 100;
for (int i = 0; i < 3; ++i) {
final int startRow = i * STEP;
final int endRow = (i + 1) * STEP;
LOG.info("Writing data from " + startRow + " to " + endRow);
HTU.loadNumericRows(table, HBaseTestingUtility.fam1, startRow, endRow);
verifyReplication(tableName, regionReplication, startRow, endRow, false);
// Flush the table, now the data should show up in the replicas
LOG.info("flushing table");
HTU.flush(tableName);
verifyReplication(tableName, regionReplication, 0, endRow, true);
}
} finally {
table.close();
connection.close();
}
}
@Test (timeout = 240000)
public void testRegionReplicaReplicationForFlushAndCompaction() throws Exception {
// Tests a table with region replication 3. Writes some data, and causes flushes and