diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/HTableDescriptor.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/HTableDescriptor.java index 51352bb7aa4..2c0bd300384 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/HTableDescriptor.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/HTableDescriptor.java @@ -176,6 +176,14 @@ public class HTableDescriptor implements Comparable { private static final Bytes REGION_REPLICATION_KEY = new Bytes(Bytes.toBytes(REGION_REPLICATION)); + /** + * INTERNAL flag to indicate whether or not the memstore should be replicated + * for read-replicas (CONSISTENCY => TIMELINE). + */ + public static final String REGION_MEMSTORE_REPLICATION = "REGION_MEMSTORE_REPLICATION"; + private static final Bytes REGION_MEMSTORE_REPLICATION_KEY = + new Bytes(Bytes.toBytes(REGION_MEMSTORE_REPLICATION)); + /** Default durability for HTD is USE_DEFAULT, which defaults to HBase-global default value */ private static final Durability DEFAULT_DURABLITY = Durability.USE_DEFAULT; @@ -210,6 +218,8 @@ public class HTableDescriptor implements Comparable { public static final int DEFAULT_REGION_REPLICATION = 1; + public static final boolean DEFAULT_REGION_MEMSTORE_REPLICATION = true; + private final static Map DEFAULT_VALUES = new HashMap(); private final static Set RESERVED_KEYWORDS @@ -1073,6 +1083,27 @@ public class HTableDescriptor implements Comparable { return this; } + /** + * @return true if the read-replicas memstore replication is enabled. + */ + public boolean hasRegionMemstoreReplication() { + return isSomething(REGION_MEMSTORE_REPLICATION_KEY, DEFAULT_REGION_MEMSTORE_REPLICATION); + } + + /** + * Enable or Disable the memstore replication from the primary region to the replicas. + * The replication will be used only for meta operations (e.g. flush, compaction, ...) + * + * @param memstoreReplication true if the new data written to the primary region + * should be replicated. + * false if the secondaries can tollerate to have new + * data only when the primary flushes the memstore. + */ + public HTableDescriptor setRegionMemstoreReplication(boolean memstoreReplication) { + setValue(REGION_MEMSTORE_REPLICATION_KEY, memstoreReplication ? TRUE : FALSE); + return this; + } + /** * Returns all the column family names of the current table. The map of * HTableDescriptor contains mapping of family name to HColumnDescriptors. diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java index d4bb931763e..4b8939abdcf 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java @@ -3216,6 +3216,13 @@ public class HRegionServer extends HasThread implements return configurationManager; } + /** + * @return Return table descriptors implementation. + */ + public TableDescriptors getTableDescriptors() { + return this.tableDescriptors; + } + /** * Reload the configuration from disk. */ diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/WALEdit.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/WALEdit.java index 39d05362f71..5d0573fdbd4 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/WALEdit.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/WALEdit.java @@ -126,6 +126,15 @@ public class WALEdit implements Writable, HeapSize { return CellUtil.matchingFamily(cell, METAFAMILY); } + public boolean isMetaEdit() { + for (Cell cell: cells) { + if (!isMetaEditFamily(cell)) { + return false; + } + } + return true; + } + /** * @return True when current WALEdit is created by log replay. Replication skips WALEdits from * replay. @@ -345,7 +354,7 @@ public class WALEdit implements Writable, HeapSize { bulkLoadDescriptor.toByteArray()); return new WALEdit().add(kv); } - + /** * Deserialized and returns a BulkLoadDescriptor from the passed in Cell * @param cell the key value @@ -357,4 +366,4 @@ public class WALEdit implements Writable, HeapSize { } return null; } -} \ No newline at end of file +} diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/ReplicationEndpoint.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/ReplicationEndpoint.java index c3ec976ac27..978e85326fc 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/ReplicationEndpoint.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/ReplicationEndpoint.java @@ -26,6 +26,7 @@ import org.apache.hadoop.hbase.classification.InterfaceAudience; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.hbase.HBaseInterfaceAudience; +import org.apache.hadoop.hbase.TableDescriptors; import org.apache.hadoop.hbase.wal.WAL.Entry; import org.apache.hadoop.hbase.replication.regionserver.MetricsSource; @@ -51,6 +52,7 @@ public interface ReplicationEndpoint extends Service { class Context { private final Configuration conf; private final FileSystem fs; + private final TableDescriptors tableDescriptors; private final ReplicationPeerConfig peerConfig; private final ReplicationPeer replicationPeer; private final String peerId; @@ -65,7 +67,8 @@ public interface ReplicationEndpoint extends Service { final String peerId, final UUID clusterId, final ReplicationPeer replicationPeer, - final MetricsSource metrics) { + final MetricsSource metrics, + final TableDescriptors tableDescriptors) { this.peerConfig = peerConfig; this.conf = conf; this.fs = fs; @@ -73,6 +76,7 @@ public interface ReplicationEndpoint extends Service { this.peerId = peerId; this.replicationPeer = replicationPeer; this.metrics = metrics; + this.tableDescriptors = tableDescriptors; } public Configuration getConfiguration() { return conf; @@ -95,6 +99,9 @@ public interface ReplicationEndpoint extends Service { public MetricsSource getMetrics() { return metrics; } + public TableDescriptors getTableDescriptors() { + return tableDescriptors; + } } /** diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/RegionReplicaReplicationEndpoint.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/RegionReplicaReplicationEndpoint.java index 559b7bf6c18..c75f81f1b77 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/RegionReplicaReplicationEndpoint.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/RegionReplicaReplicationEndpoint.java @@ -43,9 +43,11 @@ import org.apache.hadoop.hbase.HBaseIOException; import org.apache.hadoop.hbase.HConstants; import org.apache.hadoop.hbase.HRegionInfo; import org.apache.hadoop.hbase.HRegionLocation; +import org.apache.hadoop.hbase.HTableDescriptor; import org.apache.hadoop.hbase.RegionLocations; import org.apache.hadoop.hbase.TableName; import org.apache.hadoop.hbase.TableNotFoundException; +import org.apache.hadoop.hbase.TableDescriptors; import org.apache.hadoop.hbase.client.ConnectionFactory; import org.apache.hadoop.hbase.client.RegionAdminServiceCallable; import org.apache.hadoop.hbase.client.ClusterConnection; @@ -94,6 +96,7 @@ public class RegionReplicaReplicationEndpoint extends HBaseReplicationEndpoint { private Configuration conf; private ClusterConnection connection; + private TableDescriptors tableDescriptors; // Reuse WALSplitter constructs as a WAL pipe private PipelineController controller; @@ -150,6 +153,7 @@ public class RegionReplicaReplicationEndpoint extends HBaseReplicationEndpoint { super.init(context); this.conf = HBaseConfiguration.create(context.getConfiguration()); + this.tableDescriptors = context.getTableDescriptors(); // HRS multiplies client retries by 10 globally for meta operations, but we do not want this. // We are resetting it here because we want default number of retries (35) rather than 10 times @@ -182,8 +186,8 @@ public class RegionReplicaReplicationEndpoint extends HBaseReplicationEndpoint { try { connection = (ClusterConnection) ConnectionFactory.createConnection(this.conf); this.pool = getDefaultThreadPool(conf); - outputSink = new RegionReplicaOutputSink(controller, entryBuffers, connection, pool, - numWriterThreads, operationTimeout); + outputSink = new RegionReplicaOutputSink(controller, tableDescriptors, entryBuffers, + connection, pool, numWriterThreads, operationTimeout); outputSink.startWriterThreads(); super.doStart(); } catch (IOException ex) { @@ -311,12 +315,28 @@ public class RegionReplicaReplicationEndpoint extends HBaseReplicationEndpoint { } static class RegionReplicaOutputSink extends OutputSink { - private RegionReplicaSinkWriter sinkWriter; + private final RegionReplicaSinkWriter sinkWriter; + private final TableDescriptors tableDescriptors; + private final Cache memstoreReplicationEnabled; - public RegionReplicaOutputSink(PipelineController controller, EntryBuffers entryBuffers, - ClusterConnection connection, ExecutorService pool, int numWriters, int operationTimeout) { + public RegionReplicaOutputSink(PipelineController controller, TableDescriptors tableDescriptors, + EntryBuffers entryBuffers, ClusterConnection connection, ExecutorService pool, + int numWriters, int operationTimeout) { super(controller, entryBuffers, numWriters); this.sinkWriter = new RegionReplicaSinkWriter(this, connection, pool, operationTimeout); + this.tableDescriptors = tableDescriptors; + + // A cache for the table "memstore replication enabled" flag. + // It has a default expiry of 5 sec. This means that if the table is altered + // with a different flag value, we might miss to replicate for that amount of + // time. But this cache avoid the slow lookup and parsing of the TableDescriptor. + int memstoreReplicationEnabledCacheExpiryMs = connection.getConfiguration() + .getInt("hbase.region.replica.replication.cache.memstoreReplicationEnabled.expiryMs", 5000); + this.memstoreReplicationEnabled = CacheBuilder.newBuilder() + .expireAfterWrite(memstoreReplicationEnabledCacheExpiryMs, TimeUnit.MILLISECONDS) + .initialCapacity(10) + .maximumSize(1000) + .build(); } @Override @@ -327,6 +347,12 @@ public class RegionReplicaReplicationEndpoint extends HBaseReplicationEndpoint { return; } + // meta edits (e.g. flush) are always replicated. + // data edits (e.g. put) are replicated if the table requires them. + if (!requiresReplication(buffer.getTableName(), entries)) { + return; + } + sinkWriter.append(buffer.getTableName(), buffer.getEncodedRegionName(), entries.get(0).getEdit().getCells().get(0).getRow(), entries); } @@ -358,6 +384,44 @@ public class RegionReplicaReplicationEndpoint extends HBaseReplicationEndpoint { AtomicLong getSkippedEditsCounter() { return skippedEdits; } + + /** + * returns true if the specified entry must be replicated. + * We should always replicate meta operations (e.g. flush) + * and use the user HTD flag to decide whether or not replicate the memstore. + */ + private boolean requiresReplication(final TableName tableName, final List entries) + throws IOException { + // unit-tests may not the TableDescriptors, bypass the check and always replicate + if (tableDescriptors == null) return true; + + Boolean requiresReplication = memstoreReplicationEnabled.getIfPresent(tableName); + if (requiresReplication == null) { + // check if the table requires memstore replication + // some unit-test drop the table, so we should do a bypass check and always replicate. + HTableDescriptor htd = tableDescriptors.get(tableName); + requiresReplication = htd == null || htd.hasRegionMemstoreReplication(); + memstoreReplicationEnabled.put(tableName, requiresReplication); + } + + // if memstore replication is not required, check the entries. + // meta edits (e.g. flush) must be always replicated. + if (!requiresReplication) { + int skipEdits = 0; + java.util.Iterator it = entries.iterator(); + while (it.hasNext()) { + Entry entry = it.next(); + if (entry.getEdit().isMetaEdit()) { + requiresReplication = true; + } else { + it.remove(); + skipEdits++; + } + } + skippedEdits.addAndGet(skipEdits); + } + return requiresReplication; + } } static class RegionReplicaSinkWriter extends SinkWriter { diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceManager.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceManager.java index 4908ebc7c9a..4d9725702f9 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceManager.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceManager.java @@ -43,6 +43,7 @@ import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import org.apache.hadoop.hbase.Server; +import org.apache.hadoop.hbase.TableDescriptors; import org.apache.hadoop.hbase.classification.InterfaceAudience; import org.apache.hadoop.hbase.regionserver.HRegionServer; import org.apache.hadoop.hbase.regionserver.RegionServerCoprocessorHost; @@ -200,7 +201,7 @@ public class ReplicationSourceManager implements ReplicationListener { } } } - + private void cleanOldLogs(SortedSet wals, String key, String id) { SortedSet walSet = wals.headSet(key); LOG.debug("Removing " + walSet.size() + " logs in the list: " + walSet); @@ -302,7 +303,7 @@ public class ReplicationSourceManager implements ReplicationListener { protected Map> getWALs() { return Collections.unmodifiableMap(walsById); } - + /** * Get a copy of the wals of the recovered sources on this rs * @return a sorted set of wal names @@ -375,8 +376,10 @@ public class ReplicationSourceManager implements ReplicationListener { final ReplicationPeerConfig peerConfig, final ReplicationPeer replicationPeer) throws IOException { RegionServerCoprocessorHost rsServerHost = null; + TableDescriptors tableDescriptors = null; if (server instanceof HRegionServer) { rsServerHost = ((HRegionServer) server).getRegionServerCoprocessorHost(); + tableDescriptors = ((HRegionServer) server).getTableDescriptors(); } ReplicationSourceInterface src; try { @@ -420,7 +423,7 @@ public class ReplicationSourceManager implements ReplicationListener { // init replication endpoint replicationEndpoint.init(new ReplicationEndpoint.Context(replicationPeer.getConfiguration(), - fs, peerConfig, peerId, clusterId, replicationPeer, metrics)); + fs, peerConfig, peerId, clusterId, replicationPeer, metrics, tableDescriptors)); return src; } diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/HBaseTestingUtility.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/HBaseTestingUtility.java index 84d38adb785..26290d6b007 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/HBaseTestingUtility.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/HBaseTestingUtility.java @@ -2203,10 +2203,20 @@ public class HBaseTestingUtility extends HBaseCommonTestingUtility { public void verifyNumericRows(HRegion region, final byte[] f, int startRow, int endRow) throws IOException { + verifyNumericRows(region, f, startRow, endRow, true); + } + + public void verifyNumericRows(HRegion region, final byte[] f, int startRow, int endRow, + final boolean present) throws IOException { for (int i = startRow; i < endRow; i++) { String failMsg = "Failed verification of row :" + i; byte[] data = Bytes.toBytes(String.valueOf(i)); Result result = region.get(new Get(data)); + + boolean hasResult = result != null && !result.isEmpty(); + assertEquals(failMsg + result, present, hasResult); + if (!present) continue; + assertTrue(failMsg, result.containsColumn(f, null)); assertEquals(failMsg, result.getColumnCells(f, null).size(), 1); Cell cell = result.getColumnLatestCell(f, null); diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestRegionReplicaReplicationEndpoint.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestRegionReplicaReplicationEndpoint.java index 886bd6e4606..1da289730c5 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestRegionReplicaReplicationEndpoint.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestRegionReplicaReplicationEndpoint.java @@ -172,6 +172,11 @@ public class TestRegionReplicaReplicationEndpoint { private void verifyReplication(TableName tableName, int regionReplication, final int startRow, final int endRow) throws Exception { + verifyReplication(tableName, regionReplication, startRow, endRow, true); + } + + private void verifyReplication(TableName tableName, int regionReplication, + final int startRow, final int endRow, final boolean present) throws Exception { // find the regions final HRegion[] regions = new HRegion[regionReplication]; @@ -195,7 +200,7 @@ public class TestRegionReplicaReplicationEndpoint { public boolean evaluate() throws Exception { LOG.info("verifying replication for region replica:" + region.getRegionInfo()); try { - HTU.verifyNumericRows(region, HBaseTestingUtility.fam1, startRow, endRow); + HTU.verifyNumericRows(region, HBaseTestingUtility.fam1, startRow, endRow, present); } catch(Throwable ex) { LOG.warn("Verification from secondary region is not complete yet. Got:" + ex + " " + ex.getMessage()); @@ -223,6 +228,38 @@ public class TestRegionReplicaReplicationEndpoint { testRegionReplicaReplication(10); } + @Test (timeout = 240000) + public void testRegionReplicaWithoutMemstoreReplication() throws Exception { + int regionReplication = 3; + TableName tableName = TableName.valueOf("testRegionReplicaWithoutMemstoreReplication"); + HTableDescriptor htd = HTU.createTableDescriptor(tableName.toString()); + htd.setRegionReplication(regionReplication); + htd.setRegionMemstoreReplication(false); + HTU.getHBaseAdmin().createTable(htd); + + Connection connection = ConnectionFactory.createConnection(HTU.getConfiguration()); + Table table = connection.getTable(tableName); + try { + // write data to the primary. The replicas should not receive the data + final int STEP = 100; + for (int i = 0; i < 3; ++i) { + final int startRow = i * STEP; + final int endRow = (i + 1) * STEP; + LOG.info("Writing data from " + startRow + " to " + endRow); + HTU.loadNumericRows(table, HBaseTestingUtility.fam1, startRow, endRow); + verifyReplication(tableName, regionReplication, startRow, endRow, false); + + // Flush the table, now the data should show up in the replicas + LOG.info("flushing table"); + HTU.flush(tableName); + verifyReplication(tableName, regionReplication, 0, endRow, true); + } + } finally { + table.close(); + connection.close(); + } + } + @Test (timeout = 240000) public void testRegionReplicaReplicationForFlushAndCompaction() throws Exception { // Tests a table with region replication 3. Writes some data, and causes flushes and