From 9ebaea9f549880a5ce2e0818d1f9d61beeec850e Mon Sep 17 00:00:00 2001 From: Michael Stack Date: Wed, 28 Aug 2013 19:32:00 +0000 Subject: [PATCH] HBASE-7709 Infinite loop possible in Master/Master replication git-svn-id: https://svn.apache.org/repos/asf/hbase/trunk@1518335 13f79535-47bb-0310-9956-ffa450edef68 --- .../apache/hadoop/hbase/client/Mutation.java | 45 +- .../hbase/protobuf/generated/WALProtos.java | 828 +++++++++++++++--- hbase-protocol/src/main/protobuf/WAL.proto | 17 +- .../apache/hadoop/hbase/mapreduce/Import.java | 9 +- .../protobuf/ReplicationProtbufUtil.java | 6 +- .../hbase/regionserver/BaseRowProcessor.java | 7 +- .../hadoop/hbase/regionserver/HRegion.java | 13 +- .../hbase/regionserver/RowProcessor.java | 5 +- .../hadoop/hbase/regionserver/wal/FSHLog.java | 22 +- .../hadoop/hbase/regionserver/wal/HLog.java | 19 +- .../hbase/regionserver/wal/HLogKey.java | 133 ++- .../hbase/regionserver/wal/HLogSplitter.java | 7 +- .../wal/SequenceFileLogReader.java | 2 +- .../regionserver/ReplicationSink.java | 16 +- .../regionserver/ReplicationSource.java | 18 +- .../hbase/snapshot/SnapshotLogSplitter.java | 2 +- .../hbase/regionserver/TestHRegion.java | 8 +- .../wal/HLogPerformanceEvaluation.java | 5 +- .../replication/TestMasterReplication.java | 489 ++++++----- 19 files changed, 1211 insertions(+), 440 deletions(-) diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/Mutation.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/Mutation.java index 3afdd47cb3d..b02ecf6271d 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/Mutation.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/Mutation.java @@ -39,6 +39,10 @@ import org.apache.hadoop.hbase.io.HeapSize; import org.apache.hadoop.hbase.util.Bytes; import org.apache.hadoop.hbase.util.ClassSize; +import com.google.common.io.ByteArrayDataInput; +import com.google.common.io.ByteArrayDataOutput; +import com.google.common.io.ByteStreams; + @InterfaceAudience.Public @InterfaceStability.Evolving public abstract class Mutation extends OperationWithAttributes implements Row, CellScannable, @@ -57,8 +61,10 @@ public abstract class Mutation extends OperationWithAttributes implements Row, C // familyMap ClassSize.TREEMAP); - // Attribute used in Mutations to indicate the originating cluster. - private static final String CLUSTER_ID_ATTR = "_c.id_"; + /** + * The attribute for storing the list of clusters that have consumed the change. + */ + private static final String CONSUMED_CLUSTER_IDS = "_cs.id"; protected byte [] row = null; protected long ts = HConstants.LATEST_TIMESTAMP; @@ -225,26 +231,33 @@ public abstract class Mutation extends OperationWithAttributes implements Row, C } /** - * Set the replication custer id. - * @param clusterId + * Marks that the clusters with the given clusterIds have consumed the mutation + * @param clusterIds of the clusters that have consumed the mutation */ - public void setClusterId(UUID clusterId) { - if (clusterId == null) return; - byte[] val = new byte[2*Bytes.SIZEOF_LONG]; - Bytes.putLong(val, 0, clusterId.getMostSignificantBits()); - Bytes.putLong(val, Bytes.SIZEOF_LONG, clusterId.getLeastSignificantBits()); - setAttribute(CLUSTER_ID_ATTR, val); + public void setClusterIds(List clusterIds) { + ByteArrayDataOutput out = ByteStreams.newDataOutput(); + out.writeInt(clusterIds.size()); + for (UUID clusterId : clusterIds) { + out.writeLong(clusterId.getMostSignificantBits()); + out.writeLong(clusterId.getLeastSignificantBits()); + } + setAttribute(CONSUMED_CLUSTER_IDS, out.toByteArray()); } /** - * @return The replication cluster id. + * @return the set of clusterIds that have consumed the mutation */ - public UUID getClusterId() { - byte[] attr = getAttribute(CLUSTER_ID_ATTR); - if (attr == null) { - return HConstants.DEFAULT_CLUSTER_ID; + public List getClusterIds() { + List clusterIds = new ArrayList(); + byte[] bytes = getAttribute(CONSUMED_CLUSTER_IDS); + if(bytes != null) { + ByteArrayDataInput in = ByteStreams.newDataInput(bytes); + int numClusters = in.readInt(); + for(int i=0; ioptional .UUID cluster_id = 5; + * optional .UUID cluster_id = 5 [deprecated = true]; + * + *
+     *
+     *This parameter is deprecated in favor of clusters which 
+     *contains the list of clusters that have consumed the change.
+     *It is retained so that the log created by earlier releases (0.94) 
+     *can be read by the newer releases.
+     * 
*/ - boolean hasClusterId(); + @java.lang.Deprecated boolean hasClusterId(); /** - * optional .UUID cluster_id = 5; + * optional .UUID cluster_id = 5 [deprecated = true]; + * + *
+     *
+     *This parameter is deprecated in favor of clusters which 
+     *contains the list of clusters that have consumed the change.
+     *It is retained so that the log created by earlier releases (0.94) 
+     *can be read by the newer releases.
+     * 
*/ - org.apache.hadoop.hbase.protobuf.generated.HBaseProtos.UUID getClusterId(); + @java.lang.Deprecated org.apache.hadoop.hbase.protobuf.generated.HBaseProtos.UUID getClusterId(); /** - * optional .UUID cluster_id = 5; + * optional .UUID cluster_id = 5 [deprecated = true]; + * + *
+     *
+     *This parameter is deprecated in favor of clusters which 
+     *contains the list of clusters that have consumed the change.
+     *It is retained so that the log created by earlier releases (0.94) 
+     *can be read by the newer releases.
+     * 
*/ - org.apache.hadoop.hbase.protobuf.generated.HBaseProtos.UUIDOrBuilder getClusterIdOrBuilder(); + @java.lang.Deprecated org.apache.hadoop.hbase.protobuf.generated.HBaseProtos.UUIDOrBuilder getClusterIdOrBuilder(); // repeated .FamilyScope scopes = 6; /** @@ -609,30 +633,67 @@ public final class WALProtos { // optional uint32 following_kv_count = 7; /** * optional uint32 following_kv_count = 7; - * - *
-     *
-     *optional CustomEntryType custom_entry_type = 8;
-     *
-     *enum CustomEntryType {
-     *COMPACTION = 0;
-     *}
-     * 
*/ boolean hasFollowingKvCount(); /** * optional uint32 following_kv_count = 7; + */ + int getFollowingKvCount(); + + // repeated .UUID cluster_ids = 8; + /** + * repeated .UUID cluster_ids = 8; * *
      *
-     *optional CustomEntryType custom_entry_type = 8;
-     *
-     *enum CustomEntryType {
-     *COMPACTION = 0;
-     *}
+     *cluster_ids field contains the list of clusters that have
+     *consumed the change
      * 
*/ - int getFollowingKvCount(); + java.util.List + getClusterIdsList(); + /** + * repeated .UUID cluster_ids = 8; + * + *
+     *
+     *cluster_ids field contains the list of clusters that have
+     *consumed the change
+     * 
+ */ + org.apache.hadoop.hbase.protobuf.generated.HBaseProtos.UUID getClusterIds(int index); + /** + * repeated .UUID cluster_ids = 8; + * + *
+     *
+     *cluster_ids field contains the list of clusters that have
+     *consumed the change
+     * 
+ */ + int getClusterIdsCount(); + /** + * repeated .UUID cluster_ids = 8; + * + *
+     *
+     *cluster_ids field contains the list of clusters that have
+     *consumed the change
+     * 
+ */ + java.util.List + getClusterIdsOrBuilderList(); + /** + * repeated .UUID cluster_ids = 8; + * + *
+     *
+     *cluster_ids field contains the list of clusters that have
+     *consumed the change
+     * 
+ */ + org.apache.hadoop.hbase.protobuf.generated.HBaseProtos.UUIDOrBuilder getClusterIdsOrBuilder( + int index); } /** * Protobuf type {@code WALKey} @@ -735,6 +796,14 @@ public final class WALProtos { followingKvCount_ = input.readUInt32(); break; } + case 66: { + if (!((mutable_bitField0_ & 0x00000080) == 0x00000080)) { + clusterIds_ = new java.util.ArrayList(); + mutable_bitField0_ |= 0x00000080; + } + clusterIds_.add(input.readMessage(org.apache.hadoop.hbase.protobuf.generated.HBaseProtos.UUID.PARSER, extensionRegistry)); + break; + } } } } catch (com.google.protobuf.InvalidProtocolBufferException e) { @@ -746,6 +815,9 @@ public final class WALProtos { if (((mutable_bitField0_ & 0x00000020) == 0x00000020)) { scopes_ = java.util.Collections.unmodifiableList(scopes_); } + if (((mutable_bitField0_ & 0x00000080) == 0x00000080)) { + clusterIds_ = java.util.Collections.unmodifiableList(clusterIds_); + } this.unknownFields = unknownFields.build(); makeExtensionsImmutable(); } @@ -842,25 +914,49 @@ public final class WALProtos { return writeTime_; } - // optional .UUID cluster_id = 5; + // optional .UUID cluster_id = 5 [deprecated = true]; public static final int CLUSTER_ID_FIELD_NUMBER = 5; private org.apache.hadoop.hbase.protobuf.generated.HBaseProtos.UUID clusterId_; /** - * optional .UUID cluster_id = 5; + * optional .UUID cluster_id = 5 [deprecated = true]; + * + *
+     *
+     *This parameter is deprecated in favor of clusters which 
+     *contains the list of clusters that have consumed the change.
+     *It is retained so that the log created by earlier releases (0.94) 
+     *can be read by the newer releases.
+     * 
*/ - public boolean hasClusterId() { + @java.lang.Deprecated public boolean hasClusterId() { return ((bitField0_ & 0x00000010) == 0x00000010); } /** - * optional .UUID cluster_id = 5; + * optional .UUID cluster_id = 5 [deprecated = true]; + * + *
+     *
+     *This parameter is deprecated in favor of clusters which 
+     *contains the list of clusters that have consumed the change.
+     *It is retained so that the log created by earlier releases (0.94) 
+     *can be read by the newer releases.
+     * 
*/ - public org.apache.hadoop.hbase.protobuf.generated.HBaseProtos.UUID getClusterId() { + @java.lang.Deprecated public org.apache.hadoop.hbase.protobuf.generated.HBaseProtos.UUID getClusterId() { return clusterId_; } /** - * optional .UUID cluster_id = 5; + * optional .UUID cluster_id = 5 [deprecated = true]; + * + *
+     *
+     *This parameter is deprecated in favor of clusters which 
+     *contains the list of clusters that have consumed the change.
+     *It is retained so that the log created by earlier releases (0.94) 
+     *can be read by the newer releases.
+     * 
*/ - public org.apache.hadoop.hbase.protobuf.generated.HBaseProtos.UUIDOrBuilder getClusterIdOrBuilder() { + @java.lang.Deprecated public org.apache.hadoop.hbase.protobuf.generated.HBaseProtos.UUIDOrBuilder getClusterIdOrBuilder() { return clusterId_; } @@ -905,35 +1001,83 @@ public final class WALProtos { private int followingKvCount_; /** * optional uint32 following_kv_count = 7; - * - *
-     *
-     *optional CustomEntryType custom_entry_type = 8;
-     *
-     *enum CustomEntryType {
-     *COMPACTION = 0;
-     *}
-     * 
*/ public boolean hasFollowingKvCount() { return ((bitField0_ & 0x00000020) == 0x00000020); } /** * optional uint32 following_kv_count = 7; - * - *
-     *
-     *optional CustomEntryType custom_entry_type = 8;
-     *
-     *enum CustomEntryType {
-     *COMPACTION = 0;
-     *}
-     * 
*/ public int getFollowingKvCount() { return followingKvCount_; } + // repeated .UUID cluster_ids = 8; + public static final int CLUSTER_IDS_FIELD_NUMBER = 8; + private java.util.List clusterIds_; + /** + * repeated .UUID cluster_ids = 8; + * + *
+     *
+     *cluster_ids field contains the list of clusters that have
+     *consumed the change
+     * 
+ */ + public java.util.List getClusterIdsList() { + return clusterIds_; + } + /** + * repeated .UUID cluster_ids = 8; + * + *
+     *
+     *cluster_ids field contains the list of clusters that have
+     *consumed the change
+     * 
+ */ + public java.util.List + getClusterIdsOrBuilderList() { + return clusterIds_; + } + /** + * repeated .UUID cluster_ids = 8; + * + *
+     *
+     *cluster_ids field contains the list of clusters that have
+     *consumed the change
+     * 
+ */ + public int getClusterIdsCount() { + return clusterIds_.size(); + } + /** + * repeated .UUID cluster_ids = 8; + * + *
+     *
+     *cluster_ids field contains the list of clusters that have
+     *consumed the change
+     * 
+ */ + public org.apache.hadoop.hbase.protobuf.generated.HBaseProtos.UUID getClusterIds(int index) { + return clusterIds_.get(index); + } + /** + * repeated .UUID cluster_ids = 8; + * + *
+     *
+     *cluster_ids field contains the list of clusters that have
+     *consumed the change
+     * 
+ */ + public org.apache.hadoop.hbase.protobuf.generated.HBaseProtos.UUIDOrBuilder getClusterIdsOrBuilder( + int index) { + return clusterIds_.get(index); + } + private void initFields() { encodedRegionName_ = com.google.protobuf.ByteString.EMPTY; tableName_ = com.google.protobuf.ByteString.EMPTY; @@ -942,6 +1086,7 @@ public final class WALProtos { clusterId_ = org.apache.hadoop.hbase.protobuf.generated.HBaseProtos.UUID.getDefaultInstance(); scopes_ = java.util.Collections.emptyList(); followingKvCount_ = 0; + clusterIds_ = java.util.Collections.emptyList(); } private byte memoizedIsInitialized = -1; public final boolean isInitialized() { @@ -976,6 +1121,12 @@ public final class WALProtos { return false; } } + for (int i = 0; i < getClusterIdsCount(); i++) { + if (!getClusterIds(i).isInitialized()) { + memoizedIsInitialized = 0; + return false; + } + } memoizedIsInitialized = 1; return true; } @@ -1004,6 +1155,9 @@ public final class WALProtos { if (((bitField0_ & 0x00000020) == 0x00000020)) { output.writeUInt32(7, followingKvCount_); } + for (int i = 0; i < clusterIds_.size(); i++) { + output.writeMessage(8, clusterIds_.get(i)); + } getUnknownFields().writeTo(output); } @@ -1041,6 +1195,10 @@ public final class WALProtos { size += com.google.protobuf.CodedOutputStream .computeUInt32Size(7, followingKvCount_); } + for (int i = 0; i < clusterIds_.size(); i++) { + size += com.google.protobuf.CodedOutputStream + .computeMessageSize(8, clusterIds_.get(i)); + } size += getUnknownFields().getSerializedSize(); memoizedSerializedSize = size; return size; @@ -1096,6 +1254,8 @@ public final class WALProtos { result = result && (getFollowingKvCount() == other.getFollowingKvCount()); } + result = result && getClusterIdsList() + .equals(other.getClusterIdsList()); result = result && getUnknownFields().equals(other.getUnknownFields()); return result; @@ -1137,6 +1297,10 @@ public final class WALProtos { hash = (37 * hash) + FOLLOWING_KV_COUNT_FIELD_NUMBER; hash = (53 * hash) + getFollowingKvCount(); } + if (getClusterIdsCount() > 0) { + hash = (37 * hash) + CLUSTER_IDS_FIELD_NUMBER; + hash = (53 * hash) + getClusterIdsList().hashCode(); + } hash = (29 * hash) + getUnknownFields().hashCode(); memoizedHashCode = hash; return hash; @@ -1244,6 +1408,7 @@ public final class WALProtos { if (com.google.protobuf.GeneratedMessage.alwaysUseFieldBuilders) { getClusterIdFieldBuilder(); getScopesFieldBuilder(); + getClusterIdsFieldBuilder(); } } private static Builder create() { @@ -1274,6 +1439,12 @@ public final class WALProtos { } followingKvCount_ = 0; bitField0_ = (bitField0_ & ~0x00000040); + if (clusterIdsBuilder_ == null) { + clusterIds_ = java.util.Collections.emptyList(); + bitField0_ = (bitField0_ & ~0x00000080); + } else { + clusterIdsBuilder_.clear(); + } return this; } @@ -1339,6 +1510,15 @@ public final class WALProtos { to_bitField0_ |= 0x00000020; } result.followingKvCount_ = followingKvCount_; + if (clusterIdsBuilder_ == null) { + if (((bitField0_ & 0x00000080) == 0x00000080)) { + clusterIds_ = java.util.Collections.unmodifiableList(clusterIds_); + bitField0_ = (bitField0_ & ~0x00000080); + } + result.clusterIds_ = clusterIds_; + } else { + result.clusterIds_ = clusterIdsBuilder_.build(); + } result.bitField0_ = to_bitField0_; onBuilt(); return result; @@ -1399,6 +1579,32 @@ public final class WALProtos { if (other.hasFollowingKvCount()) { setFollowingKvCount(other.getFollowingKvCount()); } + if (clusterIdsBuilder_ == null) { + if (!other.clusterIds_.isEmpty()) { + if (clusterIds_.isEmpty()) { + clusterIds_ = other.clusterIds_; + bitField0_ = (bitField0_ & ~0x00000080); + } else { + ensureClusterIdsIsMutable(); + clusterIds_.addAll(other.clusterIds_); + } + onChanged(); + } + } else { + if (!other.clusterIds_.isEmpty()) { + if (clusterIdsBuilder_.isEmpty()) { + clusterIdsBuilder_.dispose(); + clusterIdsBuilder_ = null; + clusterIds_ = other.clusterIds_; + bitField0_ = (bitField0_ & ~0x00000080); + clusterIdsBuilder_ = + com.google.protobuf.GeneratedMessage.alwaysUseFieldBuilders ? + getClusterIdsFieldBuilder() : null; + } else { + clusterIdsBuilder_.addAllMessages(other.clusterIds_); + } + } + } this.mergeUnknownFields(other.getUnknownFields()); return this; } @@ -1432,6 +1638,12 @@ public final class WALProtos { return false; } } + for (int i = 0; i < getClusterIdsCount(); i++) { + if (!getClusterIds(i).isInitialized()) { + + return false; + } + } return true; } @@ -1592,20 +1804,36 @@ public final class WALProtos { return this; } - // optional .UUID cluster_id = 5; + // optional .UUID cluster_id = 5 [deprecated = true]; private org.apache.hadoop.hbase.protobuf.generated.HBaseProtos.UUID clusterId_ = org.apache.hadoop.hbase.protobuf.generated.HBaseProtos.UUID.getDefaultInstance(); private com.google.protobuf.SingleFieldBuilder< org.apache.hadoop.hbase.protobuf.generated.HBaseProtos.UUID, org.apache.hadoop.hbase.protobuf.generated.HBaseProtos.UUID.Builder, org.apache.hadoop.hbase.protobuf.generated.HBaseProtos.UUIDOrBuilder> clusterIdBuilder_; /** - * optional .UUID cluster_id = 5; + * optional .UUID cluster_id = 5 [deprecated = true]; + * + *
+       *
+       *This parameter is deprecated in favor of clusters which 
+       *contains the list of clusters that have consumed the change.
+       *It is retained so that the log created by earlier releases (0.94) 
+       *can be read by the newer releases.
+       * 
*/ - public boolean hasClusterId() { + @java.lang.Deprecated public boolean hasClusterId() { return ((bitField0_ & 0x00000010) == 0x00000010); } /** - * optional .UUID cluster_id = 5; + * optional .UUID cluster_id = 5 [deprecated = true]; + * + *
+       *
+       *This parameter is deprecated in favor of clusters which 
+       *contains the list of clusters that have consumed the change.
+       *It is retained so that the log created by earlier releases (0.94) 
+       *can be read by the newer releases.
+       * 
*/ - public org.apache.hadoop.hbase.protobuf.generated.HBaseProtos.UUID getClusterId() { + @java.lang.Deprecated public org.apache.hadoop.hbase.protobuf.generated.HBaseProtos.UUID getClusterId() { if (clusterIdBuilder_ == null) { return clusterId_; } else { @@ -1613,9 +1841,17 @@ public final class WALProtos { } } /** - * optional .UUID cluster_id = 5; + * optional .UUID cluster_id = 5 [deprecated = true]; + * + *
+       *
+       *This parameter is deprecated in favor of clusters which 
+       *contains the list of clusters that have consumed the change.
+       *It is retained so that the log created by earlier releases (0.94) 
+       *can be read by the newer releases.
+       * 
*/ - public Builder setClusterId(org.apache.hadoop.hbase.protobuf.generated.HBaseProtos.UUID value) { + @java.lang.Deprecated public Builder setClusterId(org.apache.hadoop.hbase.protobuf.generated.HBaseProtos.UUID value) { if (clusterIdBuilder_ == null) { if (value == null) { throw new NullPointerException(); @@ -1629,9 +1865,17 @@ public final class WALProtos { return this; } /** - * optional .UUID cluster_id = 5; + * optional .UUID cluster_id = 5 [deprecated = true]; + * + *
+       *
+       *This parameter is deprecated in favor of clusters which 
+       *contains the list of clusters that have consumed the change.
+       *It is retained so that the log created by earlier releases (0.94) 
+       *can be read by the newer releases.
+       * 
*/ - public Builder setClusterId( + @java.lang.Deprecated public Builder setClusterId( org.apache.hadoop.hbase.protobuf.generated.HBaseProtos.UUID.Builder builderForValue) { if (clusterIdBuilder_ == null) { clusterId_ = builderForValue.build(); @@ -1643,9 +1887,17 @@ public final class WALProtos { return this; } /** - * optional .UUID cluster_id = 5; + * optional .UUID cluster_id = 5 [deprecated = true]; + * + *
+       *
+       *This parameter is deprecated in favor of clusters which 
+       *contains the list of clusters that have consumed the change.
+       *It is retained so that the log created by earlier releases (0.94) 
+       *can be read by the newer releases.
+       * 
*/ - public Builder mergeClusterId(org.apache.hadoop.hbase.protobuf.generated.HBaseProtos.UUID value) { + @java.lang.Deprecated public Builder mergeClusterId(org.apache.hadoop.hbase.protobuf.generated.HBaseProtos.UUID value) { if (clusterIdBuilder_ == null) { if (((bitField0_ & 0x00000010) == 0x00000010) && clusterId_ != org.apache.hadoop.hbase.protobuf.generated.HBaseProtos.UUID.getDefaultInstance()) { @@ -1662,9 +1914,17 @@ public final class WALProtos { return this; } /** - * optional .UUID cluster_id = 5; + * optional .UUID cluster_id = 5 [deprecated = true]; + * + *
+       *
+       *This parameter is deprecated in favor of clusters which 
+       *contains the list of clusters that have consumed the change.
+       *It is retained so that the log created by earlier releases (0.94) 
+       *can be read by the newer releases.
+       * 
*/ - public Builder clearClusterId() { + @java.lang.Deprecated public Builder clearClusterId() { if (clusterIdBuilder_ == null) { clusterId_ = org.apache.hadoop.hbase.protobuf.generated.HBaseProtos.UUID.getDefaultInstance(); onChanged(); @@ -1675,17 +1935,33 @@ public final class WALProtos { return this; } /** - * optional .UUID cluster_id = 5; + * optional .UUID cluster_id = 5 [deprecated = true]; + * + *
+       *
+       *This parameter is deprecated in favor of clusters which 
+       *contains the list of clusters that have consumed the change.
+       *It is retained so that the log created by earlier releases (0.94) 
+       *can be read by the newer releases.
+       * 
*/ - public org.apache.hadoop.hbase.protobuf.generated.HBaseProtos.UUID.Builder getClusterIdBuilder() { + @java.lang.Deprecated public org.apache.hadoop.hbase.protobuf.generated.HBaseProtos.UUID.Builder getClusterIdBuilder() { bitField0_ |= 0x00000010; onChanged(); return getClusterIdFieldBuilder().getBuilder(); } /** - * optional .UUID cluster_id = 5; + * optional .UUID cluster_id = 5 [deprecated = true]; + * + *
+       *
+       *This parameter is deprecated in favor of clusters which 
+       *contains the list of clusters that have consumed the change.
+       *It is retained so that the log created by earlier releases (0.94) 
+       *can be read by the newer releases.
+       * 
*/ - public org.apache.hadoop.hbase.protobuf.generated.HBaseProtos.UUIDOrBuilder getClusterIdOrBuilder() { + @java.lang.Deprecated public org.apache.hadoop.hbase.protobuf.generated.HBaseProtos.UUIDOrBuilder getClusterIdOrBuilder() { if (clusterIdBuilder_ != null) { return clusterIdBuilder_.getMessageOrBuilder(); } else { @@ -1693,7 +1969,15 @@ public final class WALProtos { } } /** - * optional .UUID cluster_id = 5; + * optional .UUID cluster_id = 5 [deprecated = true]; + * + *
+       *
+       *This parameter is deprecated in favor of clusters which 
+       *contains the list of clusters that have consumed the change.
+       *It is retained so that the log created by earlier releases (0.94) 
+       *can be read by the newer releases.
+       * 
*/ private com.google.protobuf.SingleFieldBuilder< org.apache.hadoop.hbase.protobuf.generated.HBaseProtos.UUID, org.apache.hadoop.hbase.protobuf.generated.HBaseProtos.UUID.Builder, org.apache.hadoop.hbase.protobuf.generated.HBaseProtos.UUIDOrBuilder> @@ -1953,45 +2237,18 @@ public final class WALProtos { private int followingKvCount_ ; /** * optional uint32 following_kv_count = 7; - * - *
-       *
-       *optional CustomEntryType custom_entry_type = 8;
-       *
-       *enum CustomEntryType {
-       *COMPACTION = 0;
-       *}
-       * 
*/ public boolean hasFollowingKvCount() { return ((bitField0_ & 0x00000040) == 0x00000040); } /** * optional uint32 following_kv_count = 7; - * - *
-       *
-       *optional CustomEntryType custom_entry_type = 8;
-       *
-       *enum CustomEntryType {
-       *COMPACTION = 0;
-       *}
-       * 
*/ public int getFollowingKvCount() { return followingKvCount_; } /** * optional uint32 following_kv_count = 7; - * - *
-       *
-       *optional CustomEntryType custom_entry_type = 8;
-       *
-       *enum CustomEntryType {
-       *COMPACTION = 0;
-       *}
-       * 
*/ public Builder setFollowingKvCount(int value) { bitField0_ |= 0x00000040; @@ -2001,15 +2258,6 @@ public final class WALProtos { } /** * optional uint32 following_kv_count = 7; - * - *
-       *
-       *optional CustomEntryType custom_entry_type = 8;
-       *
-       *enum CustomEntryType {
-       *COMPACTION = 0;
-       *}
-       * 
*/ public Builder clearFollowingKvCount() { bitField0_ = (bitField0_ & ~0x00000040); @@ -2018,6 +2266,354 @@ public final class WALProtos { return this; } + // repeated .UUID cluster_ids = 8; + private java.util.List clusterIds_ = + java.util.Collections.emptyList(); + private void ensureClusterIdsIsMutable() { + if (!((bitField0_ & 0x00000080) == 0x00000080)) { + clusterIds_ = new java.util.ArrayList(clusterIds_); + bitField0_ |= 0x00000080; + } + } + + private com.google.protobuf.RepeatedFieldBuilder< + org.apache.hadoop.hbase.protobuf.generated.HBaseProtos.UUID, org.apache.hadoop.hbase.protobuf.generated.HBaseProtos.UUID.Builder, org.apache.hadoop.hbase.protobuf.generated.HBaseProtos.UUIDOrBuilder> clusterIdsBuilder_; + + /** + * repeated .UUID cluster_ids = 8; + * + *
+       *
+       *cluster_ids field contains the list of clusters that have
+       *consumed the change
+       * 
+ */ + public java.util.List getClusterIdsList() { + if (clusterIdsBuilder_ == null) { + return java.util.Collections.unmodifiableList(clusterIds_); + } else { + return clusterIdsBuilder_.getMessageList(); + } + } + /** + * repeated .UUID cluster_ids = 8; + * + *
+       *
+       *cluster_ids field contains the list of clusters that have
+       *consumed the change
+       * 
+ */ + public int getClusterIdsCount() { + if (clusterIdsBuilder_ == null) { + return clusterIds_.size(); + } else { + return clusterIdsBuilder_.getCount(); + } + } + /** + * repeated .UUID cluster_ids = 8; + * + *
+       *
+       *cluster_ids field contains the list of clusters that have
+       *consumed the change
+       * 
+ */ + public org.apache.hadoop.hbase.protobuf.generated.HBaseProtos.UUID getClusterIds(int index) { + if (clusterIdsBuilder_ == null) { + return clusterIds_.get(index); + } else { + return clusterIdsBuilder_.getMessage(index); + } + } + /** + * repeated .UUID cluster_ids = 8; + * + *
+       *
+       *cluster_ids field contains the list of clusters that have
+       *consumed the change
+       * 
+ */ + public Builder setClusterIds( + int index, org.apache.hadoop.hbase.protobuf.generated.HBaseProtos.UUID value) { + if (clusterIdsBuilder_ == null) { + if (value == null) { + throw new NullPointerException(); + } + ensureClusterIdsIsMutable(); + clusterIds_.set(index, value); + onChanged(); + } else { + clusterIdsBuilder_.setMessage(index, value); + } + return this; + } + /** + * repeated .UUID cluster_ids = 8; + * + *
+       *
+       *cluster_ids field contains the list of clusters that have
+       *consumed the change
+       * 
+ */ + public Builder setClusterIds( + int index, org.apache.hadoop.hbase.protobuf.generated.HBaseProtos.UUID.Builder builderForValue) { + if (clusterIdsBuilder_ == null) { + ensureClusterIdsIsMutable(); + clusterIds_.set(index, builderForValue.build()); + onChanged(); + } else { + clusterIdsBuilder_.setMessage(index, builderForValue.build()); + } + return this; + } + /** + * repeated .UUID cluster_ids = 8; + * + *
+       *
+       *cluster_ids field contains the list of clusters that have
+       *consumed the change
+       * 
+ */ + public Builder addClusterIds(org.apache.hadoop.hbase.protobuf.generated.HBaseProtos.UUID value) { + if (clusterIdsBuilder_ == null) { + if (value == null) { + throw new NullPointerException(); + } + ensureClusterIdsIsMutable(); + clusterIds_.add(value); + onChanged(); + } else { + clusterIdsBuilder_.addMessage(value); + } + return this; + } + /** + * repeated .UUID cluster_ids = 8; + * + *
+       *
+       *cluster_ids field contains the list of clusters that have
+       *consumed the change
+       * 
+ */ + public Builder addClusterIds( + int index, org.apache.hadoop.hbase.protobuf.generated.HBaseProtos.UUID value) { + if (clusterIdsBuilder_ == null) { + if (value == null) { + throw new NullPointerException(); + } + ensureClusterIdsIsMutable(); + clusterIds_.add(index, value); + onChanged(); + } else { + clusterIdsBuilder_.addMessage(index, value); + } + return this; + } + /** + * repeated .UUID cluster_ids = 8; + * + *
+       *
+       *cluster_ids field contains the list of clusters that have
+       *consumed the change
+       * 
+ */ + public Builder addClusterIds( + org.apache.hadoop.hbase.protobuf.generated.HBaseProtos.UUID.Builder builderForValue) { + if (clusterIdsBuilder_ == null) { + ensureClusterIdsIsMutable(); + clusterIds_.add(builderForValue.build()); + onChanged(); + } else { + clusterIdsBuilder_.addMessage(builderForValue.build()); + } + return this; + } + /** + * repeated .UUID cluster_ids = 8; + * + *
+       *
+       *cluster_ids field contains the list of clusters that have
+       *consumed the change
+       * 
+ */ + public Builder addClusterIds( + int index, org.apache.hadoop.hbase.protobuf.generated.HBaseProtos.UUID.Builder builderForValue) { + if (clusterIdsBuilder_ == null) { + ensureClusterIdsIsMutable(); + clusterIds_.add(index, builderForValue.build()); + onChanged(); + } else { + clusterIdsBuilder_.addMessage(index, builderForValue.build()); + } + return this; + } + /** + * repeated .UUID cluster_ids = 8; + * + *
+       *
+       *cluster_ids field contains the list of clusters that have
+       *consumed the change
+       * 
+ */ + public Builder addAllClusterIds( + java.lang.Iterable values) { + if (clusterIdsBuilder_ == null) { + ensureClusterIdsIsMutable(); + super.addAll(values, clusterIds_); + onChanged(); + } else { + clusterIdsBuilder_.addAllMessages(values); + } + return this; + } + /** + * repeated .UUID cluster_ids = 8; + * + *
+       *
+       *cluster_ids field contains the list of clusters that have
+       *consumed the change
+       * 
+ */ + public Builder clearClusterIds() { + if (clusterIdsBuilder_ == null) { + clusterIds_ = java.util.Collections.emptyList(); + bitField0_ = (bitField0_ & ~0x00000080); + onChanged(); + } else { + clusterIdsBuilder_.clear(); + } + return this; + } + /** + * repeated .UUID cluster_ids = 8; + * + *
+       *
+       *cluster_ids field contains the list of clusters that have
+       *consumed the change
+       * 
+ */ + public Builder removeClusterIds(int index) { + if (clusterIdsBuilder_ == null) { + ensureClusterIdsIsMutable(); + clusterIds_.remove(index); + onChanged(); + } else { + clusterIdsBuilder_.remove(index); + } + return this; + } + /** + * repeated .UUID cluster_ids = 8; + * + *
+       *
+       *cluster_ids field contains the list of clusters that have
+       *consumed the change
+       * 
+ */ + public org.apache.hadoop.hbase.protobuf.generated.HBaseProtos.UUID.Builder getClusterIdsBuilder( + int index) { + return getClusterIdsFieldBuilder().getBuilder(index); + } + /** + * repeated .UUID cluster_ids = 8; + * + *
+       *
+       *cluster_ids field contains the list of clusters that have
+       *consumed the change
+       * 
+ */ + public org.apache.hadoop.hbase.protobuf.generated.HBaseProtos.UUIDOrBuilder getClusterIdsOrBuilder( + int index) { + if (clusterIdsBuilder_ == null) { + return clusterIds_.get(index); } else { + return clusterIdsBuilder_.getMessageOrBuilder(index); + } + } + /** + * repeated .UUID cluster_ids = 8; + * + *
+       *
+       *cluster_ids field contains the list of clusters that have
+       *consumed the change
+       * 
+ */ + public java.util.List + getClusterIdsOrBuilderList() { + if (clusterIdsBuilder_ != null) { + return clusterIdsBuilder_.getMessageOrBuilderList(); + } else { + return java.util.Collections.unmodifiableList(clusterIds_); + } + } + /** + * repeated .UUID cluster_ids = 8; + * + *
+       *
+       *cluster_ids field contains the list of clusters that have
+       *consumed the change
+       * 
+ */ + public org.apache.hadoop.hbase.protobuf.generated.HBaseProtos.UUID.Builder addClusterIdsBuilder() { + return getClusterIdsFieldBuilder().addBuilder( + org.apache.hadoop.hbase.protobuf.generated.HBaseProtos.UUID.getDefaultInstance()); + } + /** + * repeated .UUID cluster_ids = 8; + * + *
+       *
+       *cluster_ids field contains the list of clusters that have
+       *consumed the change
+       * 
+ */ + public org.apache.hadoop.hbase.protobuf.generated.HBaseProtos.UUID.Builder addClusterIdsBuilder( + int index) { + return getClusterIdsFieldBuilder().addBuilder( + index, org.apache.hadoop.hbase.protobuf.generated.HBaseProtos.UUID.getDefaultInstance()); + } + /** + * repeated .UUID cluster_ids = 8; + * + *
+       *
+       *cluster_ids field contains the list of clusters that have
+       *consumed the change
+       * 
+ */ + public java.util.List + getClusterIdsBuilderList() { + return getClusterIdsFieldBuilder().getBuilderList(); + } + private com.google.protobuf.RepeatedFieldBuilder< + org.apache.hadoop.hbase.protobuf.generated.HBaseProtos.UUID, org.apache.hadoop.hbase.protobuf.generated.HBaseProtos.UUID.Builder, org.apache.hadoop.hbase.protobuf.generated.HBaseProtos.UUIDOrBuilder> + getClusterIdsFieldBuilder() { + if (clusterIdsBuilder_ == null) { + clusterIdsBuilder_ = new com.google.protobuf.RepeatedFieldBuilder< + org.apache.hadoop.hbase.protobuf.generated.HBaseProtos.UUID, org.apache.hadoop.hbase.protobuf.generated.HBaseProtos.UUID.Builder, org.apache.hadoop.hbase.protobuf.generated.HBaseProtos.UUIDOrBuilder>( + clusterIds_, + ((bitField0_ & 0x00000080) == 0x00000080), + getParentForChildren(), + isClean()); + clusterIds_ = null; + } + return clusterIdsBuilder_; + } + // @@protoc_insertion_point(builder_scope:WALKey) } @@ -4216,22 +4812,22 @@ public final class WALProtos { static { java.lang.String[] descriptorData = { "\n\tWAL.proto\032\013hbase.proto\"$\n\tWALHeader\022\027\n" + - "\017has_compression\030\001 \001(\010\"\277\001\n\006WALKey\022\033\n\023enc" + + "\017has_compression\030\001 \001(\010\"\337\001\n\006WALKey\022\033\n\023enc" + "oded_region_name\030\001 \002(\014\022\022\n\ntable_name\030\002 \002" + "(\014\022\033\n\023log_sequence_number\030\003 \002(\004\022\022\n\nwrite" + - "_time\030\004 \002(\004\022\031\n\ncluster_id\030\005 \001(\0132\005.UUID\022\034" + - "\n\006scopes\030\006 \003(\0132\014.FamilyScope\022\032\n\022followin" + - "g_kv_count\030\007 \001(\r\"=\n\013FamilyScope\022\016\n\006famil" + - "y\030\001 \002(\014\022\036\n\nscope_type\030\002 \002(\0162\n.ScopeType\"" + - "\251\001\n\024CompactionDescriptor\022\022\n\ntable_name\030\001" + - " \002(\014\022\033\n\023encoded_region_name\030\002 \002(\014\022\023\n\013fam", - "ily_name\030\003 \002(\014\022\030\n\020compaction_input\030\004 \003(\t" + - "\022\031\n\021compaction_output\030\005 \003(\t\022\026\n\016store_hom" + - "e_dir\030\006 \002(\t\"\014\n\nWALTrailer*F\n\tScopeType\022\033" + - "\n\027REPLICATION_SCOPE_LOCAL\020\000\022\034\n\030REPLICATI" + - "ON_SCOPE_GLOBAL\020\001B?\n*org.apache.hadoop.h" + - "base.protobuf.generatedB\tWALProtosH\001\210\001\000\240" + - "\001\001" + "_time\030\004 \002(\004\022\035\n\ncluster_id\030\005 \001(\0132\005.UUIDB\002" + + "\030\001\022\034\n\006scopes\030\006 \003(\0132\014.FamilyScope\022\032\n\022foll" + + "owing_kv_count\030\007 \001(\r\022\032\n\013cluster_ids\030\010 \003(" + + "\0132\005.UUID\"=\n\013FamilyScope\022\016\n\006family\030\001 \002(\014\022" + + "\036\n\nscope_type\030\002 \002(\0162\n.ScopeType\"\251\001\n\024Comp" + + "actionDescriptor\022\022\n\ntable_name\030\001 \002(\014\022\033\n\023", + "encoded_region_name\030\002 \002(\014\022\023\n\013family_name" + + "\030\003 \002(\014\022\030\n\020compaction_input\030\004 \003(\t\022\031\n\021comp" + + "action_output\030\005 \003(\t\022\026\n\016store_home_dir\030\006 " + + "\002(\t\"\014\n\nWALTrailer*F\n\tScopeType\022\033\n\027REPLIC" + + "ATION_SCOPE_LOCAL\020\000\022\034\n\030REPLICATION_SCOPE" + + "_GLOBAL\020\001B?\n*org.apache.hadoop.hbase.pro" + + "tobuf.generatedB\tWALProtosH\001\210\001\000\240\001\001" }; com.google.protobuf.Descriptors.FileDescriptor.InternalDescriptorAssigner assigner = new com.google.protobuf.Descriptors.FileDescriptor.InternalDescriptorAssigner() { @@ -4249,7 +4845,7 @@ public final class WALProtos { internal_static_WALKey_fieldAccessorTable = new com.google.protobuf.GeneratedMessage.FieldAccessorTable( internal_static_WALKey_descriptor, - new java.lang.String[] { "EncodedRegionName", "TableName", "LogSequenceNumber", "WriteTime", "ClusterId", "Scopes", "FollowingKvCount", }); + new java.lang.String[] { "EncodedRegionName", "TableName", "LogSequenceNumber", "WriteTime", "ClusterId", "Scopes", "FollowingKvCount", "ClusterIds", }); internal_static_FamilyScope_descriptor = getDescriptor().getMessageTypes().get(2); internal_static_FamilyScope_fieldAccessorTable = new diff --git a/hbase-protocol/src/main/protobuf/WAL.proto b/hbase-protocol/src/main/protobuf/WAL.proto index d0551ffb18e..bf2a5135ce2 100644 --- a/hbase-protocol/src/main/protobuf/WAL.proto +++ b/hbase-protocol/src/main/protobuf/WAL.proto @@ -33,13 +33,24 @@ message WALKey { required bytes table_name = 2; required uint64 log_sequence_number = 3; required uint64 write_time = 4; - optional UUID cluster_id = 5; + /* + This parameter is deprecated in favor of clusters which + contains the list of clusters that have consumed the change. + It is retained so that the log created by earlier releases (0.94) + can be read by the newer releases. + */ + optional UUID cluster_id = 5 [deprecated=true]; repeated FamilyScope scopes = 6; optional uint32 following_kv_count = 7; + /* + This field contains the list of clusters that have + consumed the change + */ + repeated UUID cluster_ids = 8; /* - optional CustomEntryType custom_entry_type = 8; - + optional CustomEntryType custom_entry_type = 9; + enum CustomEntryType { COMPACTION = 0; } diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/Import.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/Import.java index 32ca435d588..72d30f00fa1 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/Import.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/Import.java @@ -22,6 +22,7 @@ import java.io.IOException; import java.lang.reflect.InvocationTargetException; import java.lang.reflect.Method; import java.util.ArrayList; +import java.util.Collections; import java.util.List; import java.util.Map; import java.util.TreeMap; @@ -114,7 +115,7 @@ public class Import { static class Importer extends TableMapper { private Map cfRenameMap; - private UUID clusterId; + private List clusterIds; /** * @param row The current table row key. @@ -159,11 +160,11 @@ public class Import { } } if (put != null) { - put.setClusterId(clusterId); + put.setClusterIds(clusterIds); context.write(key, put); } if (delete != null) { - delete.setClusterId(clusterId); + delete.setClusterIds(clusterIds); context.write(key, delete); } } @@ -177,7 +178,7 @@ public class Import { ZooKeeperWatcher zkw = null; try { zkw = new ZooKeeperWatcher(conf, context.getTaskAttemptID().toString(), null); - clusterId = ZKClusterId.getUUIDForCluster(zkw); + clusterIds = Collections.singletonList(ZKClusterId.getUUIDForCluster(zkw)); } catch (ZooKeeperConnectionException e) { LOG.error("Problem connecting to ZooKeper during task setup", e); } catch (KeeperException e) { diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/protobuf/ReplicationProtbufUtil.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/protobuf/ReplicationProtbufUtil.java index 5bad72c94e1..b9019e0a0d9 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/protobuf/ReplicationProtbufUtil.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/protobuf/ReplicationProtbufUtil.java @@ -115,6 +115,7 @@ public class ReplicationProtbufUtil { AdminProtos.WALEntry.Builder entryBuilder = AdminProtos.WALEntry.newBuilder(); AdminProtos.ReplicateWALEntryRequest.Builder builder = AdminProtos.ReplicateWALEntryRequest.newBuilder(); + HBaseProtos.UUID.Builder uuidBuilder = HBaseProtos.UUID.newBuilder(); for (HLog.Entry entry: entries) { entryBuilder.clear(); WALProtos.WALKey.Builder keyBuilder = entryBuilder.getKeyBuilder(); @@ -124,11 +125,10 @@ public class ReplicationProtbufUtil { keyBuilder.setTableName(ByteString.copyFrom(key.getTablename().getName())); keyBuilder.setLogSequenceNumber(key.getLogSeqNum()); keyBuilder.setWriteTime(key.getWriteTime()); - UUID clusterId = key.getClusterId(); - if (clusterId != null) { - HBaseProtos.UUID.Builder uuidBuilder = keyBuilder.getClusterIdBuilder(); + for(UUID clusterId : key.getClusterIds()) { uuidBuilder.setLeastSigBits(clusterId.getLeastSignificantBits()); uuidBuilder.setMostSigBits(clusterId.getMostSignificantBits()); + keyBuilder.addClusterIds(uuidBuilder.build()); } WALEdit edit = entry.getEdit(); NavigableMap scopes = key.getScopes(); diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/BaseRowProcessor.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/BaseRowProcessor.java index 7299f0db2f8..38e0c0f49ba 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/BaseRowProcessor.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/BaseRowProcessor.java @@ -18,9 +18,10 @@ package org.apache.hadoop.hbase.regionserver; import java.io.IOException; +import java.util.ArrayList; +import java.util.List; import java.util.UUID; -import org.apache.hadoop.hbase.HConstants; import org.apache.hadoop.hbase.client.Durability; import org.apache.hadoop.hbase.regionserver.wal.WALEdit; @@ -41,8 +42,8 @@ implements RowProcessor { } @Override - public UUID getClusterId() { - return HConstants.DEFAULT_CLUSTER_ID; + public List getClusterIds() { + return new ArrayList(); } @Override diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java index 9bcf5d7f58c..545e65ca8ca 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java @@ -1779,15 +1779,13 @@ public class HRegion implements HeapSize { // , Writable{ /** * This is used only by unit tests. Not required to be a public API. * @param familyMap map of family to edits for the given family. - * @param clusterId * @param durability * @throws IOException */ - void delete(NavigableMap> familyMap, UUID clusterId, + void delete(NavigableMap> familyMap, Durability durability) throws IOException { Delete delete = new Delete(FOR_UNIT_TESTS_ONLY); delete.setFamilyMap(familyMap); - delete.setClusterId(clusterId); delete.setDurability(durability); doBatchMutate(delete); } @@ -2206,7 +2204,7 @@ public class HRegion implements HeapSize { // , Writable{ Mutation mutation = batchOp.operations[firstIndex]; if (walEdit.size() > 0) { txid = this.log.appendNoSync(this.getRegionInfo(), this.htableDescriptor.getTableName(), - walEdit, mutation.getClusterId(), now, this.htableDescriptor); + walEdit, mutation.getClusterIds(), now, this.htableDescriptor); } // ------------------------------- @@ -2598,7 +2596,6 @@ public class HRegion implements HeapSize { // , Writable{ familyMap.put(family, edits); Put p = new Put(row); p.setFamilyMap(familyMap); - p.setClusterId(HConstants.DEFAULT_CLUSTER_ID); doBatchMutate(p); } @@ -4534,7 +4531,7 @@ public class HRegion implements HeapSize { // , Writable{ if (!walEdit.isEmpty()) { txid = this.log.appendNoSync(this.getRegionInfo(), this.htableDescriptor.getTableName(), walEdit, - processor.getClusterId(), now, this.htableDescriptor); + processor.getClusterIds(), now, this.htableDescriptor); } // 8. Release region lock if (locked) { @@ -4761,7 +4758,7 @@ public class HRegion implements HeapSize { // , Writable{ // cluster. A slave cluster receives the final value (not the delta) // as a Put. txid = this.log.appendNoSync(this.getRegionInfo(), this.htableDescriptor.getTableName(), - walEdits, HConstants.DEFAULT_CLUSTER_ID, EnvironmentEdgeManager.currentTimeMillis(), + walEdits, new ArrayList(), EnvironmentEdgeManager.currentTimeMillis(), this.htableDescriptor); } else { recordMutationWithoutWal(append.getFamilyCellMap()); @@ -4911,7 +4908,7 @@ public class HRegion implements HeapSize { // , Writable{ // cluster. A slave cluster receives the final value (not the delta) // as a Put. txid = this.log.appendNoSync(this.getRegionInfo(), this.htableDescriptor.getTableName(), - walEdits, HConstants.DEFAULT_CLUSTER_ID, EnvironmentEdgeManager.currentTimeMillis(), + walEdits, new ArrayList(), EnvironmentEdgeManager.currentTimeMillis(), this.htableDescriptor); } else { recordMutationWithoutWal(increment.getFamilyCellMap()); diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RowProcessor.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RowProcessor.java index 00f9d5d6d39..72ac84cf14d 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RowProcessor.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RowProcessor.java @@ -28,7 +28,6 @@ import org.apache.hadoop.hbase.KeyValue; import org.apache.hadoop.hbase.client.Durability; import org.apache.hadoop.hbase.regionserver.wal.WALEdit; -import com.google.protobuf.ByteString; import com.google.protobuf.Message; @InterfaceAudience.Public @@ -107,9 +106,9 @@ public interface RowProcessor { /** - * @return The replication cluster id. + * @return The cluster ids that have the change. */ - UUID getClusterId(); + List getClusterIds(); /** * Human readable name of the processor diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/FSHLog.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/FSHLog.java index 8cd86d7a486..b29bb719867 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/FSHLog.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/FSHLog.java @@ -27,6 +27,7 @@ import java.net.URLEncoder; import java.util.ArrayList; import java.util.Arrays; import java.util.Collections; +import java.util.LinkedHashSet; import java.util.LinkedList; import java.util.List; import java.util.Map; @@ -821,12 +822,12 @@ class FSHLog implements HLog, Syncable { * @param encodedRegionName Encoded name of the region as returned by * HRegionInfo#getEncodedNameAsBytes(). * @param tableName - * @param clusterId + * @param clusterIds that have consumed the change * @return New log key. */ protected HLogKey makeKey(byte[] encodedRegionName, TableName tableName, long seqnum, - long now, UUID clusterId) { - return new HLogKey(encodedRegionName, tableName, seqnum, now, clusterId); + long now, List clusterIds) { + return new HLogKey(encodedRegionName, tableName, seqnum, now, clusterIds); } @Override @@ -839,7 +840,7 @@ class FSHLog implements HLog, Syncable { @Override public void append(HRegionInfo info, TableName tableName, WALEdit edits, final long now, HTableDescriptor htd, boolean isInMemstore) throws IOException { - append(info, tableName, edits, HConstants.DEFAULT_CLUSTER_ID, now, htd, true, isInMemstore); + append(info, tableName, edits, new ArrayList(), now, htd, true, isInMemstore); } /** @@ -862,15 +863,16 @@ class FSHLog implements HLog, Syncable { * @param info * @param tableName * @param edits - * @param clusterId The originating clusterId for this edit (for replication) + * @param clusterIds that have consumed the change (for replication) * @param now * @param doSync shall we sync? * @return txid of this transaction * @throws IOException */ @SuppressWarnings("deprecation") - private long append(HRegionInfo info, TableName tableName, WALEdit edits, UUID clusterId, - final long now, HTableDescriptor htd, boolean doSync, boolean isInMemstore) + private long append(HRegionInfo info, TableName tableName, WALEdit edits, + List clusterIds, final long now, HTableDescriptor htd, boolean doSync, + boolean isInMemstore) throws IOException { if (edits.isEmpty()) return this.unflushedEntries.get(); if (this.closed) { @@ -890,7 +892,7 @@ class FSHLog implements HLog, Syncable { // actual name. byte [] encodedRegionName = info.getEncodedNameAsBytes(); if (isInMemstore) this.oldestUnflushedSeqNums.putIfAbsent(encodedRegionName, seqNum); - HLogKey logKey = makeKey(encodedRegionName, tableName, seqNum, now, clusterId); + HLogKey logKey = makeKey(encodedRegionName, tableName, seqNum, now, clusterIds); doWrite(info, logKey, edits, htd); this.numEntries.incrementAndGet(); txid = this.unflushedEntries.incrementAndGet(); @@ -914,9 +916,9 @@ class FSHLog implements HLog, Syncable { @Override public long appendNoSync(HRegionInfo info, TableName tableName, WALEdit edits, - UUID clusterId, final long now, HTableDescriptor htd) + List clusterIds, final long now, HTableDescriptor htd) throws IOException { - return append(info, tableName, edits, clusterId, now, htd, false, true); + return append(info, tableName, edits, clusterIds, now, htd, false, true); } /** diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/HLog.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/HLog.java index 7e37ccdad16..f03561c8501 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/HLog.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/HLog.java @@ -22,6 +22,8 @@ package org.apache.hadoop.hbase.regionserver.wal; import java.io.DataInput; import java.io.DataOutput; import java.io.IOException; +import java.util.List; +import java.util.Set; import java.util.UUID; import java.util.regex.Pattern; @@ -264,7 +266,7 @@ public interface HLog { void closeAndDelete() throws IOException; /** - * Same as {@link #appendNoSync(HRegionInfo, TableName, WALEdit, UUID, long, HTableDescriptor)}, + * Same as {@link #appendNoSync(HRegionInfo, TableName, WALEdit, List, long, HTableDescriptor)}, * except it causes a sync on the log */ public void append(HRegionInfo info, TableName tableName, WALEdit edits, @@ -285,23 +287,20 @@ public interface HLog { final long now, HTableDescriptor htd, boolean isInMemstore) throws IOException; /** - * Append a set of edits to the log. Log edits are keyed by (encoded) - * regionName, rowname, and log-sequence-id. The HLog is not flushed after - * this transaction is written to the log. - * + * Append a set of edits to the log. Log edits are keyed by (encoded) regionName, rowname, and + * log-sequence-id. The HLog is not flushed after this transaction is written to the log. * @param info * @param tableName * @param edits - * @param clusterId - * The originating clusterId for this edit (for replication) + * @param clusterIds The clusters that have consumed the change (for replication) * @param now * @param htd * @return txid of this transaction * @throws IOException */ - public long appendNoSync(HRegionInfo info, TableName tableName, WALEdit edits, - UUID clusterId, final long now, HTableDescriptor htd) throws IOException; - + public long appendNoSync(HRegionInfo info, TableName tableName, WALEdit edits, + List clusterIds, final long now, HTableDescriptor htd) throws IOException; + void hsync() throws IOException; void hflush() throws IOException; diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/HLogKey.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/HLogKey.java index 481ecc6dbbd..1ed0d9b2638 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/HLogKey.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/HLogKey.java @@ -22,7 +22,11 @@ import java.io.DataInput; import java.io.DataOutput; import java.io.EOFException; import java.io.IOException; +import java.util.ArrayList; import java.util.HashMap; +import java.util.Iterator; +import java.util.LinkedHashSet; +import java.util.List; import java.util.Map; import java.util.NavigableMap; import java.util.TreeMap; @@ -93,6 +97,13 @@ public class HLogKey implements WritableComparable { } } + /* + * This is used for reading the log entries created by the previous releases + * (0.94.11) which write the clusters information to the scopes of WALEdit. + */ + private static final String PREFIX_CLUSTER_KEY = "."; + + private static final Version VERSION = Version.COMPRESSED; // The encoded region name. @@ -102,15 +113,23 @@ public class HLogKey implements WritableComparable { // Time at which this edit was written. private long writeTime; - private UUID clusterId; - + // The first element in the list is the cluster id on which the change has originated + private List clusterIds; + private NavigableMap scopes; private CompressionContext compressionContext; public HLogKey() { - this(null, null, 0L, HConstants.LATEST_TIMESTAMP, - HConstants.DEFAULT_CLUSTER_ID); + init(null, null, 0L, HConstants.LATEST_TIMESTAMP, + new ArrayList()); + } + + public HLogKey(final byte[] encodedRegionName, final TableName tablename, long logSeqNum, + final long now, UUID clusterId) { + List clusterIds = new ArrayList(); + clusterIds.add(clusterId); + init(encodedRegionName, tablename, logSeqNum, now, clusterIds); } /** @@ -123,13 +142,18 @@ public class HLogKey implements WritableComparable { * @param tablename - name of table * @param logSeqNum - log sequence number * @param now Time at which this edit was written. - * @param clusterId of the cluster (used in Replication) + * @param clusterIds the clusters that have consumed the change(used in Replication) */ public HLogKey(final byte [] encodedRegionName, final TableName tablename, - long logSeqNum, final long now, UUID clusterId) { + long logSeqNum, final long now, List clusterIds){ + init(encodedRegionName, tablename, logSeqNum, now, clusterIds); + } + + protected void init(final byte [] encodedRegionName, final TableName tablename, + long logSeqNum, final long now, List clusterIds) { this.logSeqNum = logSeqNum; this.writeTime = now; - this.clusterId = clusterId; + this.clusterIds = clusterIds; this.encodedRegionName = encodedRegionName; this.tablename = tablename; } @@ -171,14 +195,6 @@ public class HLogKey implements WritableComparable { return this.writeTime; } - /** - * Get the id of the original cluster - * @return Cluster id. - */ - public UUID getClusterId() { - return clusterId; - } - public NavigableMap getScopes() { return scopes; } @@ -187,12 +203,47 @@ public class HLogKey implements WritableComparable { this.scopes = scopes; } + public void readOlderScopes(NavigableMap scopes) { + if (scopes != null) { + Iterator> iterator = scopes.entrySet() + .iterator(); + while (iterator.hasNext()) { + Map.Entry scope = iterator.next(); + String key = Bytes.toString(scope.getKey()); + if (key.startsWith(PREFIX_CLUSTER_KEY)) { + addClusterId(UUID.fromString(key.substring(PREFIX_CLUSTER_KEY + .length()))); + iterator.remove(); + } + } + if (scopes.size() > 0) { + this.scopes = scopes; + } + } + } + /** - * Set the cluster id of this key. - * @param clusterId + * Marks that the cluster with the given clusterId has consumed the change */ - public void setClusterId(UUID clusterId) { - this.clusterId = clusterId; + public void addClusterId(UUID clusterId) { + if (!clusterIds.contains(clusterId)) { + clusterIds.add(clusterId); + } + } + + /** + * @return the set of cluster Ids that have consumed the change + */ + public List getClusterIds() { + return clusterIds; + } + + /** + * @return the cluster id on which the change has originated. It there is no such cluster, it + * returns DEFAULT_CLUSTER_ID (cases where replication is not enabled) + */ + public UUID getOriginatingClusterId(){ + return clusterIds.isEmpty() ? HConstants.DEFAULT_CLUSTER_ID : clusterIds.get(0); } @Override @@ -232,7 +283,6 @@ public class HLogKey implements WritableComparable { int result = Bytes.hashCode(this.encodedRegionName); result ^= this.logSeqNum; result ^= this.writeTime; - result ^= this.clusterId.hashCode(); return result; } @@ -299,13 +349,16 @@ public class HLogKey implements WritableComparable { } out.writeLong(this.logSeqNum); out.writeLong(this.writeTime); - // avoid storing 16 bytes when replication is not enabled - if (this.clusterId == HConstants.DEFAULT_CLUSTER_ID) { - out.writeBoolean(false); - } else { + // Don't need to write the clusters information as we are using protobufs from 0.95 + // Writing only the first clusterId for testing the legacy read + Iterator iterator = clusterIds.iterator(); + if(iterator.hasNext()){ out.writeBoolean(true); - out.writeLong(this.clusterId.getMostSignificantBits()); - out.writeLong(this.clusterId.getLeastSignificantBits()); + UUID clusterId = iterator.next(); + out.writeLong(clusterId.getMostSignificantBits()); + out.writeLong(clusterId.getLeastSignificantBits()); + } else { + out.writeBoolean(false); } } @@ -344,10 +397,13 @@ public class HLogKey implements WritableComparable { this.logSeqNum = in.readLong(); this.writeTime = in.readLong(); - this.clusterId = HConstants.DEFAULT_CLUSTER_ID; + + this.clusterIds.clear(); if (version.atLeast(Version.INITIAL)) { if (in.readBoolean()) { - this.clusterId = new UUID(in.readLong(), in.readLong()); + // read the older log + // Definitely is the originating cluster + clusterIds.add(new UUID(in.readLong(), in.readLong())); } } else { try { @@ -357,6 +413,7 @@ public class HLogKey implements WritableComparable { // Means it's a very old key, just continue } } + // Do not need to read the clusters information as we are using protobufs from 0.95 } public WALKey.Builder getBuilder( @@ -373,10 +430,11 @@ public class HLogKey implements WritableComparable { } builder.setLogSequenceNumber(this.logSeqNum); builder.setWriteTime(writeTime); - if (this.clusterId != HConstants.DEFAULT_CLUSTER_ID) { - builder.setClusterId(HBaseProtos.UUID.newBuilder() - .setLeastSigBits(this.clusterId.getLeastSignificantBits()) - .setMostSigBits(this.clusterId.getMostSignificantBits())); + HBaseProtos.UUID.Builder uuidBuilder = HBaseProtos.UUID.newBuilder(); + for (UUID clusterId : clusterIds) { + uuidBuilder.setLeastSigBits(clusterId.getLeastSignificantBits()); + uuidBuilder.setMostSigBits(clusterId.getMostSignificantBits()); + builder.addClusterIds(uuidBuilder.build()); } if (scopes != null) { for (Map.Entry e : scopes.entrySet()) { @@ -401,10 +459,15 @@ public class HLogKey implements WritableComparable { this.encodedRegionName = walKey.getEncodedRegionName().toByteArray(); this.tablename = TableName.valueOf(walKey.getTableName().toByteArray()); } - this.clusterId = HConstants.DEFAULT_CLUSTER_ID; + clusterIds.clear(); if (walKey.hasClusterId()) { - this.clusterId = new UUID( - walKey.getClusterId().getMostSigBits(), walKey.getClusterId().getLeastSigBits()); + //When we are reading the older log (0.95.1 release) + //This is definitely the originating cluster + clusterIds.add(new UUID(walKey.getClusterId().getMostSigBits(), walKey.getClusterId() + .getLeastSigBits())); + } + for (HBaseProtos.UUID clusterId : walKey.getClusterIdsList()) { + clusterIds.add(new UUID(clusterId.getMostSigBits(), clusterId.getLeastSigBits())); } this.scopes = null; if (walKey.getScopesCount() > 0) { diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/HLogSplitter.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/HLogSplitter.java index e473f2ff916..2bdd63c760f 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/HLogSplitter.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/HLogSplitter.java @@ -22,8 +22,6 @@ import java.io.EOFException; import java.io.FileNotFoundException; import java.io.IOException; import java.io.InterruptedIOException; -import java.lang.reflect.Constructor; -import java.lang.reflect.InvocationTargetException; import java.text.ParseException; import java.util.ArrayList; import java.util.Collections; @@ -37,7 +35,6 @@ import java.util.TreeSet; import java.util.concurrent.Callable; import java.util.concurrent.CompletionService; import java.util.concurrent.ConcurrentHashMap; -import java.util.concurrent.CountDownLatch; import java.util.concurrent.ExecutionException; import java.util.concurrent.ExecutorCompletionService; import java.util.concurrent.Future; @@ -1484,11 +1481,11 @@ public class HLogSplitter { if (kv.isDelete()) { del = new Delete(kv.getRow()); - del.setClusterId(entry.getKey().getClusterId()); + del.setClusterIds(entry.getKey().getClusterIds()); preRow = del; } else { put = new Put(kv.getRow()); - put.setClusterId(entry.getKey().getClusterId()); + put.setClusterIds(entry.getKey().getClusterIds()); preRow = put; } preKey = loc.getHostnamePort() + KEY_DELIMITER + table; diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/SequenceFileLogReader.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/SequenceFileLogReader.java index b654798a825..9dc0719ef91 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/SequenceFileLogReader.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/SequenceFileLogReader.java @@ -217,7 +217,7 @@ public class SequenceFileLogReader extends ReaderBase { // Scopes are probably in WAL edit, move to key NavigableMap scopes = e.getEdit().getAndRemoveScopes(); if (scopes != null) { - e.getKey().setScopes(scopes); + e.getKey().readOlderScopes(scopes); } return true; } catch (IOException ioe) { diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSink.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSink.java index b0e9128be95..258560ef9f2 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSink.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSink.java @@ -116,13 +116,13 @@ public class ReplicationSink { long totalReplicated = 0; // Map of table => list of Rows, grouped by cluster id, we only want to flushCommits once per // invocation of this method per table and cluster id. - Map>> rowMap = new TreeMap>>(); + Map, List>> rowMap = + new TreeMap, List>>(); for (WALEntry entry : entries) { TableName table = TableName.valueOf(entry.getKey().getTableName().toByteArray()); Cell previousCell = null; Mutation m = null; - java.util.UUID uuid = toUUID(entry.getKey().getClusterId()); int count = entry.getAssociatedCellCount(); for (int i = 0; i < count; i++) { // Throw index out of bounds if our cell count is off @@ -135,8 +135,12 @@ public class ReplicationSink { m = CellUtil.isDelete(cell)? new Delete(cell.getRowArray(), cell.getRowOffset(), cell.getRowLength()): new Put(cell.getRowArray(), cell.getRowOffset(), cell.getRowLength()); - m.setClusterId(uuid); - addToHashMultiMap(rowMap, table, uuid, m); + List clusterIds = new ArrayList(); + for(HBaseProtos.UUID clusterId : entry.getKey().getClusterIdsList()){ + clusterIds.add(toUUID(clusterId)); + } + m.setClusterIds(clusterIds); + addToHashMultiMap(rowMap, table, clusterIds, m); } if (CellUtil.isDelete(cell)) { ((Delete)m).addDeleteMarker(KeyValueUtil.ensureKeyValue(cell)); @@ -147,7 +151,7 @@ public class ReplicationSink { } totalReplicated++; } - for (Entry>> entry : rowMap.entrySet()) { + for (Entry,List>> entry : rowMap.entrySet()) { batch(entry.getKey(), entry.getValue().values()); } int size = entries.size(); @@ -181,7 +185,7 @@ public class ReplicationSink { * @param key1 * @param key2 * @param value - * @return + * @return the list of values corresponding to key1 and key2 */ private List addToHashMultiMap(Map>> map, K1 key1, K2 key2, V value) { Map> innerMap = map.get(key1); diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSource.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSource.java index 51fe16ed580..199c8d7744c 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSource.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSource.java @@ -38,7 +38,6 @@ import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FileStatus; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; -import org.apache.hadoop.hbase.HConstants; import org.apache.hadoop.hbase.KeyValue; import org.apache.hadoop.hbase.Stoppable; import org.apache.hadoop.hbase.TableName; @@ -55,10 +54,8 @@ import org.apache.hadoop.hbase.replication.ReplicationPeers; import org.apache.hadoop.hbase.replication.ReplicationQueueInfo; import org.apache.hadoop.hbase.replication.ReplicationQueues; import org.apache.hadoop.hbase.replication.regionserver.ReplicationSinkManager.SinkPeer; -import org.apache.hadoop.hbase.util.Bytes; import org.apache.hadoop.hbase.util.Threads; import org.apache.hadoop.ipc.RemoteException; -import org.apache.zookeeper.KeeperException; /** * Class that handles the source of a replication stream. @@ -395,20 +392,15 @@ public class ReplicationSource extends Thread seenEntries++; // Remove all KVs that should not be replicated HLogKey logKey = entry.getKey(); - // don't replicate if the log entries originated in the peer - if (!logKey.getClusterId().equals(peerClusterId)) { + // don't replicate if the log entries have already been consumed by the cluster + if (!logKey.getClusterIds().contains(peerClusterId)) { removeNonReplicableEdits(entry); // Don't replicate catalog entries, if the WALEdit wasn't // containing anything to replicate and if we're currently not set to replicate if (!logKey.getTablename().equals(TableName.META_TABLE_NAME) && edit.size() != 0) { - // Only set the clusterId if is a local key. - // This ensures that the originator sets the cluster id - // and all replicas retain the initial cluster id. - // This is *only* place where a cluster id other than the default is set. - if (HConstants.DEFAULT_CLUSTER_ID == logKey.getClusterId()) { - logKey.setClusterId(this.clusterId); - } + //Mark that the current cluster has the change + logKey.addClusterId(clusterId); currentNbOperations += countDistinctRowKeys(edit); currentNbEntries++; currentSize += entry.getEdit().size(); @@ -817,4 +809,4 @@ public class ReplicationSource extends Thread ", currently replicating from: " + this.currentPath + " at position: " + position; } -} +} \ No newline at end of file diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/snapshot/SnapshotLogSplitter.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/snapshot/SnapshotLogSplitter.java index 219e5aa4780..8cac45cf0c1 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/snapshot/SnapshotLogSplitter.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/snapshot/SnapshotLogSplitter.java @@ -172,7 +172,7 @@ class SnapshotLogSplitter implements Closeable { // Append Entry key = new HLogKey(newRegionName, tableName, - key.getLogSeqNum(), key.getWriteTime(), key.getClusterId()); + key.getLogSeqNum(), key.getWriteTime(), key.getClusterIds()); writer.append(new HLog.Entry(key, entry.getEdit())); } } catch (IOException e) { diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestHRegion.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestHRegion.java index bc7d0405e84..7dab03d87e8 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestHRegion.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestHRegion.java @@ -1231,7 +1231,7 @@ public class TestHRegion extends HBaseTestCase { NavigableMap> deleteMap = new TreeMap>(Bytes.BYTES_COMPARATOR); deleteMap.put(family, kvs); - region.delete(deleteMap, HConstants.DEFAULT_CLUSTER_ID, Durability.SYNC_WAL); + region.delete(deleteMap, Durability.SYNC_WAL); } catch (Exception e) { assertTrue("Family " +new String(family)+ " does not exist", false); } @@ -1243,7 +1243,7 @@ public class TestHRegion extends HBaseTestCase { NavigableMap> deleteMap = new TreeMap>(Bytes.BYTES_COMPARATOR); deleteMap.put(family, kvs); - region.delete(deleteMap, HConstants.DEFAULT_CLUSTER_ID, Durability.SYNC_WAL); + region.delete(deleteMap, Durability.SYNC_WAL); } catch (Exception e) { ok = true; } @@ -1571,7 +1571,7 @@ public class TestHRegion extends HBaseTestCase { NavigableMap> deleteMap = new TreeMap>(Bytes.BYTES_COMPARATOR); deleteMap.put(fam1, kvs); - region.delete(deleteMap, HConstants.DEFAULT_CLUSTER_ID, Durability.SYNC_WAL); + region.delete(deleteMap, Durability.SYNC_WAL); // extract the key values out the memstore: // This is kinda hacky, but better than nothing... @@ -3853,7 +3853,7 @@ public class TestHRegion extends HBaseTestCase { //verify append called or not verify(log, expectAppend ? times(1) : never()) .appendNoSync((HRegionInfo)any(), eq(tableName), - (WALEdit)any(), (UUID)any(), anyLong(), (HTableDescriptor)any()); + (WALEdit)any(), (List)any(), anyLong(), (HTableDescriptor)any()); //verify sync called or not if (expectSync || expectSyncFromLogSyncer) { diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/HLogPerformanceEvaluation.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/HLogPerformanceEvaluation.java index 52a62429b4a..150e54e3129 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/HLogPerformanceEvaluation.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/HLogPerformanceEvaluation.java @@ -19,9 +19,11 @@ package org.apache.hadoop.hbase.regionserver.wal; import java.io.IOException; +import java.util.ArrayList; import java.util.List; import java.util.Map; import java.util.Random; +import java.util.UUID; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; @@ -34,7 +36,6 @@ import org.apache.hadoop.hbase.Cell; import org.apache.hadoop.hbase.HBaseConfiguration; import org.apache.hadoop.hbase.HBaseTestingUtility; import org.apache.hadoop.hbase.HColumnDescriptor; -import org.apache.hadoop.hbase.HConstants; import org.apache.hadoop.hbase.HRegionInfo; import org.apache.hadoop.hbase.HTableDescriptor; import org.apache.hadoop.hbase.KeyValue; @@ -103,7 +104,7 @@ public final class HLogPerformanceEvaluation extends Configured implements Tool HRegionInfo hri = region.getRegionInfo(); if (this.noSync) { hlog.appendNoSync(hri, hri.getTableName(), walEdit, - HConstants.DEFAULT_CLUSTER_ID, now, htd); + new ArrayList(), now, htd); } else { hlog.append(hri, hri.getTableName(), walEdit, now, htd); } diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestMasterReplication.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestMasterReplication.java index a43ecff643a..22c6814456a 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestMasterReplication.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestMasterReplication.java @@ -1,5 +1,4 @@ -/** - * +/* * Licensed to the Apache Software Foundation (ASF) under one * or more contributor license agreements. See the NOTICE file * distributed with this work for additional information @@ -22,27 +21,35 @@ import static org.junit.Assert.assertArrayEquals; import static org.junit.Assert.assertEquals; import static org.junit.Assert.fail; +import java.io.Closeable; import java.io.IOException; import java.util.List; - +import java.util.Random; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.hbase.*; +import org.apache.hadoop.hbase.HBaseConfiguration; +import org.apache.hadoop.hbase.HBaseTestingUtility; +import org.apache.hadoop.hbase.HColumnDescriptor; +import org.apache.hadoop.hbase.HConstants; +import org.apache.hadoop.hbase.HTableDescriptor; +import org.apache.hadoop.hbase.KeyValue; +import org.apache.hadoop.hbase.LargeTests; +import org.apache.hadoop.hbase.TableName; import org.apache.hadoop.hbase.client.Delete; +import org.apache.hadoop.hbase.client.Durability; import org.apache.hadoop.hbase.client.Get; import org.apache.hadoop.hbase.client.HBaseAdmin; import org.apache.hadoop.hbase.client.HTable; import org.apache.hadoop.hbase.client.Put; import org.apache.hadoop.hbase.client.Result; -import org.apache.hadoop.hbase.client.Durability; import org.apache.hadoop.hbase.client.replication.ReplicationAdmin; import org.apache.hadoop.hbase.coprocessor.BaseRegionObserver; import org.apache.hadoop.hbase.coprocessor.CoprocessorHost; import org.apache.hadoop.hbase.coprocessor.ObserverContext; import org.apache.hadoop.hbase.coprocessor.RegionCoprocessorEnvironment; -import org.apache.hadoop.hbase.util.Bytes; import org.apache.hadoop.hbase.regionserver.wal.WALEdit; +import org.apache.hadoop.hbase.util.Bytes; import org.apache.hadoop.hbase.zookeeper.MiniZooKeeperCluster; import org.apache.hadoop.hbase.zookeeper.ZooKeeperWatcher; import org.junit.After; @@ -55,18 +62,14 @@ public class TestMasterReplication { private static final Log LOG = LogFactory.getLog(TestReplicationBase.class); - private Configuration conf1; - private Configuration conf2; - private Configuration conf3; + private Configuration baseConfiguration; - private HBaseTestingUtility utility1; - private HBaseTestingUtility utility2; - private HBaseTestingUtility utility3; - - private MiniZooKeeperCluster miniZK; + private HBaseTestingUtility[] utilities; + private Configuration[] configurations; + private MiniZooKeeperCluster miniZK; private static final long SLEEP_TIME = 500; - private static final int NB_RETRIES = 100; + private static final int NB_RETRIES = 10; private static final byte[] tableName = Bytes.toBytes("test"); private static final byte[] famName = Bytes.toBytes("f"); @@ -85,44 +88,21 @@ public class TestMasterReplication { @Before public void setUp() throws Exception { - conf1 = HBaseConfiguration.create(); - conf1.set(HConstants.ZOOKEEPER_ZNODE_PARENT, "/1"); + baseConfiguration = HBaseConfiguration.create(); // smaller block size and capacity to trigger more operations // and test them - conf1.setInt("hbase.regionserver.hlog.blocksize", 1024*20); - conf1.setInt("replication.source.size.capacity", 1024); - conf1.setLong("replication.source.sleepforretries", 100); - conf1.setInt("hbase.regionserver.maxlogs", 10); - conf1.setLong("hbase.master.logcleaner.ttl", 10); - conf1.setBoolean(HConstants.REPLICATION_ENABLE_KEY, true); - conf1.setBoolean("dfs.support.append", true); - conf1.setLong(HConstants.THREAD_WAKE_FREQUENCY, 100); - conf1.setStrings(CoprocessorHost.USER_REGION_COPROCESSOR_CONF_KEY, + baseConfiguration.setInt("hbase.regionserver.hlog.blocksize", 1024 * 20); + baseConfiguration.setInt("replication.source.size.capacity", 1024); + baseConfiguration.setLong("replication.source.sleepforretries", 100); + baseConfiguration.setInt("hbase.regionserver.maxlogs", 10); + baseConfiguration.setLong("hbase.master.logcleaner.ttl", 10); + baseConfiguration.setBoolean(HConstants.REPLICATION_ENABLE_KEY, true); + baseConfiguration.setBoolean("dfs.support.append", true); + baseConfiguration.setLong(HConstants.THREAD_WAKE_FREQUENCY, 100); + baseConfiguration.setStrings( + CoprocessorHost.USER_REGION_COPROCESSOR_CONF_KEY, CoprocessorCounter.class.getName()); - utility1 = new HBaseTestingUtility(conf1); - utility1.startMiniZKCluster(); - miniZK = utility1.getZkCluster(); - // By setting the mini ZK cluster through this method, even though this is - // already utility1's mini ZK cluster, we are telling utility1 not to shut - // the mini ZK cluster when we shut down the HBase cluster. - utility1.setZkCluster(miniZK); - new ZooKeeperWatcher(conf1, "cluster1", null, true); - - conf2 = new Configuration(conf1); - conf2.set(HConstants.ZOOKEEPER_ZNODE_PARENT, "/2"); - - utility2 = new HBaseTestingUtility(conf2); - utility2.setZkCluster(miniZK); - new ZooKeeperWatcher(conf2, "cluster2", null, true); - - conf3 = new Configuration(conf1); - conf3.set(HConstants.ZOOKEEPER_ZNODE_PARENT, "/3"); - - utility3 = new HBaseTestingUtility(conf3); - utility3.setZkCluster(miniZK); - new ZooKeeperWatcher(conf3, "cluster3", null, true); - table = new HTableDescriptor(TableName.valueOf(tableName)); HColumnDescriptor fam = new HColumnDescriptor(famName); fam.setScope(HConstants.REPLICATION_SCOPE_GLOBAL); @@ -131,209 +111,325 @@ public class TestMasterReplication { table.addFamily(fam); } - @After - public void tearDown() throws IOException { - miniZK.shutdown(); - } + /** + * It tests the replication scenario involving 0 -> 1 -> 0. It does it by + * adding and deleting a row to a table in each cluster, checking if it's + * replicated. It also tests that the puts and deletes are not replicated back + * to the originating cluster. + */ + @Test(timeout = 300000) + public void testCyclicReplication1() throws Exception { + LOG.info("testSimplePutDelete"); + int numClusters = 2; + HTable[] htables = null; + try { + startMiniClusters(numClusters); + createTableOnClusters(table); - @Test(timeout=300000) - public void testCyclicReplication() throws Exception { - LOG.info("testCyclicReplication"); - utility1.startMiniCluster(); - utility2.startMiniCluster(); - utility3.startMiniCluster(); - ReplicationAdmin admin1 = new ReplicationAdmin(conf1); - ReplicationAdmin admin2 = new ReplicationAdmin(conf2); - ReplicationAdmin admin3 = new ReplicationAdmin(conf3); + htables = getHTablesOnClusters(tableName); - new HBaseAdmin(conf1).createTable(table); - new HBaseAdmin(conf2).createTable(table); - new HBaseAdmin(conf3).createTable(table); - HTable htable1 = new HTable(conf1, tableName); - htable1.setWriteBufferSize(1024); - HTable htable2 = new HTable(conf2, tableName); - htable2.setWriteBufferSize(1024); - HTable htable3 = new HTable(conf3, tableName); - htable3.setWriteBufferSize(1024); - - admin1.addPeer("1", utility2.getClusterKey()); - admin2.addPeer("1", utility3.getClusterKey()); - admin3.addPeer("1", utility1.getClusterKey()); + // Test the replication scenarios of 0 -> 1 -> 0 + addPeer("1", 0, 1); + addPeer("1", 1, 0); - // put "row" and wait 'til it got around - putAndWait(row, famName, htable1, htable3); - // it should have passed through table2 - check(row,famName,htable2); + int[] expectedCounts = new int[] { 2, 2 }; - putAndWait(row1, famName, htable2, htable1); - check(row,famName,htable3); - putAndWait(row2, famName, htable3, htable2); - check(row,famName,htable1); - - deleteAndWait(row,htable1,htable3); - deleteAndWait(row1,htable2,htable1); - deleteAndWait(row2,htable3,htable2); + // add rows to both clusters, + // make sure they are both replication + putAndWait(row, famName, htables[0], htables[1]); + putAndWait(row1, famName, htables[1], htables[0]); + validateCounts(htables, put, expectedCounts); - assertEquals("Puts were replicated back ", 3, getCount(htable1, put)); - assertEquals("Puts were replicated back ", 3, getCount(htable2, put)); - assertEquals("Puts were replicated back ", 3, getCount(htable3, put)); - assertEquals("Deletes were replicated back ", 3, getCount(htable1, delete)); - assertEquals("Deletes were replicated back ", 3, getCount(htable2, delete)); - assertEquals("Deletes were replicated back ", 3, getCount(htable3, delete)); - - // Test HBASE-9158 - admin2.disablePeer("1"); - // we now have an edit that was replicated into cluster originating from cluster 1 - putAndWait(row3, famName, htable1, htable2); - // now add a local edit to cluster 2 - Put put = new Put(row4); - put.add(famName, row4, row4); - htable2.put(put); - // reenable replication from cluster 2 to cluster 3 - admin2.enablePeer("1"); - // without HBASE-9158 the edit for row4 would have been marked with cluster 1's id - // and hence not replicated to cluster 1 - wait(row4, htable1); - - utility3.shutdownMiniCluster(); - utility2.shutdownMiniCluster(); - utility1.shutdownMiniCluster(); + deleteAndWait(row, htables[0], htables[1]); + deleteAndWait(row1, htables[1], htables[0]); + validateCounts(htables, delete, expectedCounts); + } finally { + close(htables); + shutDownMiniClusters(); + } } /** - * Add a row to a table in each cluster, check it's replicated, - * delete it, check's gone - * Also check the puts and deletes are not replicated back to - * the originating cluster. + * Tests the cyclic replication scenario of 0 -> 1 -> 2 -> 0 by adding and + * deleting rows to a table in each clusters and ensuring that the each of + * these clusters get the appropriate mutations. It also tests the grouping + * scenario where a cluster needs to replicate the edits originating from + * itself and also the edits that it received using replication from a + * different cluster. The scenario is explained in HBASE-9158 */ - @Test(timeout=300000) - public void testSimplePutDelete() throws Exception { - LOG.info("testSimplePutDelete"); - utility1.startMiniCluster(); - utility2.startMiniCluster(); + @Test(timeout = 300000) + public void testCyclicReplication2() throws Exception { + LOG.info("testCyclicReplication1"); + int numClusters = 3; + HTable[] htables = null; + try { + startMiniClusters(numClusters); + createTableOnClusters(table); - ReplicationAdmin admin1 = new ReplicationAdmin(conf1); - ReplicationAdmin admin2 = new ReplicationAdmin(conf2); + // Test the replication scenario of 0 -> 1 -> 2 -> 0 + addPeer("1", 0, 1); + addPeer("1", 1, 2); + addPeer("1", 2, 0); - new HBaseAdmin(conf1).createTable(table); - new HBaseAdmin(conf2).createTable(table); - HTable htable1 = new HTable(conf1, tableName); - htable1.setWriteBufferSize(1024); - HTable htable2 = new HTable(conf2, tableName); - htable2.setWriteBufferSize(1024); + htables = getHTablesOnClusters(tableName); - // set M-M - admin1.addPeer("1", utility2.getClusterKey()); - admin2.addPeer("1", utility1.getClusterKey()); + // put "row" and wait 'til it got around + putAndWait(row, famName, htables[0], htables[2]); + putAndWait(row1, famName, htables[1], htables[0]); + putAndWait(row2, famName, htables[2], htables[1]); - // add rows to both clusters, - // make sure they are both replication - putAndWait(row, famName, htable1, htable2); - putAndWait(row1, famName, htable2, htable1); + deleteAndWait(row, htables[0], htables[2]); + deleteAndWait(row1, htables[1], htables[0]); + deleteAndWait(row2, htables[2], htables[1]); - // make sure "row" did not get replicated back. - assertEquals("Puts were replicated back ", 2, getCount(htable1, put)); + int[] expectedCounts = new int[] { 3, 3, 3 }; + validateCounts(htables, put, expectedCounts); + validateCounts(htables, delete, expectedCounts); - // delete "row" and wait - deleteAndWait(row, htable1, htable2); - - // make the 2nd cluster replicated back - assertEquals("Puts were replicated back ", 2, getCount(htable2, put)); - - deleteAndWait(row1, htable2, htable1); - - assertEquals("Deletes were replicated back ", 2, getCount(htable1, delete)); - utility2.shutdownMiniCluster(); - utility1.shutdownMiniCluster(); + // Test HBASE-9158 + disablePeer("1", 2); + // we now have an edit that was replicated into cluster originating from + // cluster 0 + putAndWait(row3, famName, htables[0], htables[1]); + // now add a local edit to cluster 1 + htables[1].put(new Put(row4).add(famName, row4, row4)); + // re-enable replication from cluster 2 to cluster 0 + enablePeer("1", 2); + // without HBASE-9158 the edit for row4 would have been marked with + // cluster 0's id + // and hence not replicated to cluster 0 + wait(row4, htables[0], true); + } finally { + close(htables); + shutDownMiniClusters(); + } } - private int getCount(HTable t, byte[] type) throws IOException { + /** + * Tests cyclic replication scenario of 0 -> 1 -> 2 -> 1. + */ + @Test(timeout = 300000) + public void testCyclicReplication3() throws Exception { + LOG.info("testCyclicReplication2"); + int numClusters = 3; + HTable[] htables = null; + try { + startMiniClusters(numClusters); + createTableOnClusters(table); + + // Test the replication scenario of 0 -> 1 -> 2 -> 1 + addPeer("1", 0, 1); + addPeer("1", 1, 2); + addPeer("1", 2, 1); + + htables = getHTablesOnClusters(tableName); + + // put "row" and wait 'til it got around + putAndWait(row, famName, htables[0], htables[2]); + putAndWait(row1, famName, htables[1], htables[2]); + putAndWait(row2, famName, htables[2], htables[1]); + + deleteAndWait(row, htables[0], htables[2]); + deleteAndWait(row1, htables[1], htables[2]); + deleteAndWait(row2, htables[2], htables[1]); + + int[] expectedCounts = new int[] { 1, 3, 3 }; + validateCounts(htables, put, expectedCounts); + validateCounts(htables, delete, expectedCounts); + } finally { + close(htables); + shutDownMiniClusters(); + } + } + + @After + public void tearDown() throws IOException { + configurations = null; + utilities = null; + } + + @SuppressWarnings("resource") + private void startMiniClusters(int numClusters) throws Exception { + Random random = new Random(); + utilities = new HBaseTestingUtility[numClusters]; + configurations = new Configuration[numClusters]; + for (int i = 0; i < numClusters; i++) { + Configuration conf = new Configuration(baseConfiguration); + conf.set(HConstants.ZOOKEEPER_ZNODE_PARENT, "/" + i + random.nextInt()); + HBaseTestingUtility utility = new HBaseTestingUtility(conf); + if (i == 0) { + utility.startMiniZKCluster(); + miniZK = utility.getZkCluster(); + } else { + utility.setZkCluster(miniZK); + } + utility.startMiniCluster(); + utilities[i] = utility; + configurations[i] = conf; + new ZooKeeperWatcher(conf, "cluster" + i, null, true); + } + } + + private void shutDownMiniClusters() throws Exception { + int numClusters = utilities.length; + for (int i = numClusters - 1; i >= 0; i--) { + if (utilities[i] != null) { + utilities[i].shutdownMiniCluster(); + } + } + miniZK.shutdown(); + } + + private void createTableOnClusters(HTableDescriptor table) throws Exception { + int numClusters = configurations.length; + for (int i = 0; i < numClusters; i++) { + HBaseAdmin hbaseAdmin = null; + try { + hbaseAdmin = new HBaseAdmin(configurations[i]); + hbaseAdmin.createTable(table); + } finally { + close(hbaseAdmin); + } + } + } + + private void addPeer(String id, int masterClusterNumber, + int slaveClusterNumber) throws Exception { + ReplicationAdmin replicationAdmin = null; + try { + replicationAdmin = new ReplicationAdmin( + configurations[masterClusterNumber]); + replicationAdmin.addPeer(id, + utilities[slaveClusterNumber].getClusterKey()); + } finally { + close(replicationAdmin); + } + } + + private void disablePeer(String id, int masterClusterNumber) throws Exception { + ReplicationAdmin replicationAdmin = null; + try { + replicationAdmin = new ReplicationAdmin( + configurations[masterClusterNumber]); + replicationAdmin.disablePeer(id); + } finally { + close(replicationAdmin); + } + } + + private void enablePeer(String id, int masterClusterNumber) throws Exception { + ReplicationAdmin replicationAdmin = null; + try { + replicationAdmin = new ReplicationAdmin( + configurations[masterClusterNumber]); + replicationAdmin.enablePeer(id); + } finally { + close(replicationAdmin); + } + } + + private void close(Closeable... closeables) { + try { + if (closeables != null) { + for (Closeable closeable : closeables) { + closeable.close(); + } + } + } catch (Exception e) { + LOG.warn("Exception occured while closing the object:", e); + } + } + + @SuppressWarnings("resource") + private HTable[] getHTablesOnClusters(byte[] tableName) throws Exception { + int numClusters = utilities.length; + HTable[] htables = new HTable[numClusters]; + for (int i = 0; i < numClusters; i++) { + HTable htable = new HTable(configurations[i], tableName); + htable.setWriteBufferSize(1024); + htables[i] = htable; + } + return htables; + } + + private void validateCounts(HTable[] htables, byte[] type, + int[] expectedCounts) throws IOException { + for (int i = 0; i < htables.length; i++) { + assertEquals(Bytes.toString(type) + " were replicated back ", + expectedCounts[i], getCount(htables[i], type)); + } + } + + private int getCount(HTable t, byte[] type) throws IOException { Get test = new Get(row); - test.setAttribute("count", new byte[]{}); + test.setAttribute("count", new byte[] {}); Result res = t.get(test); return Bytes.toInt(res.getValue(count, type)); } private void deleteAndWait(byte[] row, HTable source, HTable target) - throws Exception { + throws Exception { Delete del = new Delete(row); source.delete(del); - - Get get = new Get(row); - for (int i = 0; i < NB_RETRIES; i++) { - if (i==NB_RETRIES-1) { - fail("Waited too much time for del replication"); - } - Result res = target.get(get); - if (res.size() >= 1) { - LOG.info("Row not deleted"); - Thread.sleep(SLEEP_TIME); - } else { - break; - } - } - } - - private void check(byte[] row, byte[] fam, HTable t) throws IOException { - Get get = new Get(row); - Result res = t.get(get); - if (res.size() == 0) { - fail("Row is missing"); - } + wait(row, target, true); } private void putAndWait(byte[] row, byte[] fam, HTable source, HTable target) - throws Exception { + throws Exception { Put put = new Put(row); put.add(fam, row, row); source.put(put); - - wait(row, target); + wait(row, target, false); } - private void wait(byte[] row, HTable target) throws Exception { + private void wait(byte[] row, HTable target, boolean isDeleted) + throws Exception { Get get = new Get(row); for (int i = 0; i < NB_RETRIES; i++) { - if (i==NB_RETRIES-1) { - fail("Waited too much time for put replication"); + if (i == NB_RETRIES - 1) { + fail("Waited too much time for replication. Row:" + Bytes.toString(row) + + ". IsDeleteReplication:" + isDeleted); } Result res = target.get(get); - if (res.size() == 0) { - LOG.info("Row not available"); + boolean sleep = isDeleted ? res.size() > 0 : res.size() == 0; + if (sleep) { + LOG.info("Waiting for more time for replication. Row:" + + Bytes.toString(row) + ". IsDeleteReplication:" + isDeleted); Thread.sleep(SLEEP_TIME); } else { - assertArrayEquals(res.value(), row); + if (!isDeleted) { + assertArrayEquals(res.value(), row); + } + LOG.info("Obtained row:" + + Bytes.toString(row) + ". IsDeleteReplication:" + isDeleted); break; } - } + } } /** - * Use a coprocessor to count puts and deletes. - * as KVs would be replicated back with the same timestamp - * there is otherwise no way to count them. + * Use a coprocessor to count puts and deletes. as KVs would be replicated back with the same + * timestamp there is otherwise no way to count them. */ public static class CoprocessorCounter extends BaseRegionObserver { private int nCount = 0; private int nDelete = 0; @Override - public void prePut(final ObserverContext e, - final Put put, final WALEdit edit, - final Durability durability) - throws IOException { + public void prePut(final ObserverContext e, final Put put, + final WALEdit edit, final Durability durability) throws IOException { nCount++; } + @Override public void postDelete(final ObserverContext c, - final Delete delete, final WALEdit edit, - final Durability durability) - throws IOException { + final Delete delete, final WALEdit edit, final Durability durability) throws IOException { nDelete++; } + @Override - public void preGet(final ObserverContext c, - final Get get, final List result) throws IOException { + public void preGet(final ObserverContext c, final Get get, + final List result) throws IOException { if (get.getAttribute("count") != null) { result.clear(); // order is important! @@ -344,5 +440,4 @@ public class TestMasterReplication { } } -} - +} \ No newline at end of file