diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ClientSmallScanner.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ClientSmallScanner.java index 9fc9cc68041..8c51e70c517 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ClientSmallScanner.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ClientSmallScanner.java @@ -104,14 +104,14 @@ public class ClientSmallScanner extends ClientScanner { if (endKey == null || Bytes.equals(endKey, HConstants.EMPTY_BYTE_ARRAY) || checkScanStopRow(endKey) || done) { close(); - if (LOG.isDebugEnabled()) { - LOG.debug("Finished with small scan at " + this.currentRegion); + if (LOG.isTraceEnabled()) { + LOG.trace("Finished with small scan at " + this.currentRegion); } return false; } localStartKey = endKey; - if (LOG.isDebugEnabled()) { - LOG.debug("Finished with region " + this.currentRegion); + if (LOG.isTraceEnabled()) { + LOG.trace("Finished with region " + this.currentRegion); } } else if (this.lastResult != null) { localStartKey = this.lastResult.getRow(); diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ConnectionUtils.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ConnectionUtils.java index dae2499c86d..a035e2fa19b 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ConnectionUtils.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ConnectionUtils.java @@ -94,6 +94,7 @@ public final class ConnectionUtils { */ public static void setServerSideHConnectionRetriesConfig( final Configuration c, final String sn, final Log log) { + // TODO: Fix this. Not all connections from server side should have 10 times the retries. int hcRetries = c.getInt(HConstants.HBASE_CLIENT_RETRIES_NUMBER, HConstants.DEFAULT_HBASE_CLIENT_RETRIES_NUMBER); // Go big. Multiply by 10. If we can't get to meta after this many retries diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/FlushRegionCallable.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/FlushRegionCallable.java new file mode 100644 index 00000000000..b2c4a57c0c9 --- /dev/null +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/FlushRegionCallable.java @@ -0,0 +1,102 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.hbase.client; + +import java.io.IOException; + +import org.apache.hadoop.hbase.HRegionInfo; +import org.apache.hadoop.hbase.TableName; +import org.apache.hadoop.hbase.classification.InterfaceAudience; +import org.apache.hadoop.hbase.ipc.PayloadCarryingRpcController; +import org.apache.hadoop.hbase.ipc.RpcControllerFactory; +import org.apache.hadoop.hbase.protobuf.ProtobufUtil; +import org.apache.hadoop.hbase.protobuf.RequestConverter; +import org.apache.hadoop.hbase.protobuf.generated.AdminProtos.FlushRegionRequest; +import org.apache.hadoop.hbase.protobuf.generated.AdminProtos.FlushRegionResponse; +import org.apache.hadoop.hbase.util.Bytes; +import org.apache.hadoop.hbase.util.EnvironmentEdgeManager; +import org.mortbay.log.Log; + +import com.google.protobuf.ServiceException; + +/** + * A Callable for flushRegion() RPC. + */ +@InterfaceAudience.Private +public class FlushRegionCallable extends RegionAdminServiceCallable { + + private final byte[] regionName; + private final boolean writeFlushWalMarker; + private boolean reload; + + public FlushRegionCallable(ClusterConnection connection, + RpcControllerFactory rpcControllerFactory, TableName tableName, byte[] regionName, + byte[] regionStartKey, boolean writeFlushWalMarker) { + super(connection, rpcControllerFactory, tableName, regionStartKey); + this.regionName = regionName; + this.writeFlushWalMarker = writeFlushWalMarker; + } + + public FlushRegionCallable(ClusterConnection connection, + RpcControllerFactory rpcControllerFactory, HRegionInfo regionInfo, + boolean writeFlushWalMarker) { + this(connection, rpcControllerFactory, regionInfo.getTable(), regionInfo.getRegionName(), + regionInfo.getStartKey(), writeFlushWalMarker); + } + + @Override + public FlushRegionResponse call(int callTimeout) throws Exception { + return flushRegion(); + } + + @Override + public void prepare(boolean reload) throws IOException { + super.prepare(reload); + this.reload = reload; + } + + private FlushRegionResponse flushRegion() throws IOException { + // check whether we should still do the flush to this region. If the regions are changed due + // to splits or merges, etc return success + if (!Bytes.equals(location.getRegionInfo().getRegionName(), regionName)) { + if (!reload) { + throw new IOException("Cached location seems to be different than requested region."); + } + Log.info("Skipping flush region, because the located region " + + Bytes.toStringBinary(location.getRegionInfo().getRegionName()) + " is different than " + + " requested region " + Bytes.toStringBinary(regionName)); + return FlushRegionResponse.newBuilder() + .setLastFlushTime(EnvironmentEdgeManager.currentTime()) + .setFlushed(false) + .setWroteFlushWalMarker(false) + .build(); + } + + FlushRegionRequest request = + RequestConverter.buildFlushRegionRequest(regionName, writeFlushWalMarker); + + try { + PayloadCarryingRpcController controller = rpcControllerFactory.newController(); + controller.setPriority(tableName); + return stub.flushRegion(controller, request); + } catch (ServiceException se) { + throw ProtobufUtil.getRemoteException(se); + } + } +} diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/RegionAdminServiceCallable.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/RegionAdminServiceCallable.java index 66dcdce1d72..189dbaa2c96 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/RegionAdminServiceCallable.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/RegionAdminServiceCallable.java @@ -23,13 +23,17 @@ import java.io.InterruptedIOException; import java.net.ConnectException; import java.net.SocketTimeoutException; +import org.apache.hadoop.hbase.DoNotRetryIOException; import org.apache.hadoop.hbase.HBaseIOException; import org.apache.hadoop.hbase.HRegionLocation; import org.apache.hadoop.hbase.NotServingRegionException; +import org.apache.hadoop.hbase.RegionLocations; import org.apache.hadoop.hbase.TableName; import org.apache.hadoop.hbase.classification.InterfaceAudience; import org.apache.hadoop.hbase.exceptions.RegionMovedException; +import org.apache.hadoop.hbase.ipc.RpcControllerFactory; import org.apache.hadoop.hbase.protobuf.generated.AdminProtos.AdminService; +import org.apache.hadoop.hbase.util.Bytes; /** * Similar to {@link RegionServerCallable} but for the AdminService interface. This service callable @@ -42,25 +46,39 @@ public abstract class RegionAdminServiceCallable implements RetryingCallable< protected final ClusterConnection connection; + protected final RpcControllerFactory rpcControllerFactory; + protected AdminService.BlockingInterface stub; protected HRegionLocation location; protected final TableName tableName; protected final byte[] row; + protected final int replicaId; protected final static int MIN_WAIT_DEAD_SERVER = 10000; - public RegionAdminServiceCallable(ClusterConnection connection, TableName tableName, byte[] row) { - this(connection, null, tableName, row); + public RegionAdminServiceCallable(ClusterConnection connection, + RpcControllerFactory rpcControllerFactory, TableName tableName, byte[] row) { + this(connection, rpcControllerFactory, null, tableName, row); } - public RegionAdminServiceCallable(ClusterConnection connection, HRegionLocation location, + public RegionAdminServiceCallable(ClusterConnection connection, + RpcControllerFactory rpcControllerFactory, HRegionLocation location, TableName tableName, byte[] row) { + this(connection, rpcControllerFactory, location, + tableName, row, RegionReplicaUtil.DEFAULT_REPLICA_ID); + } + + public RegionAdminServiceCallable(ClusterConnection connection, + RpcControllerFactory rpcControllerFactory, HRegionLocation location, + TableName tableName, byte[] row, int replicaId) { this.connection = connection; + this.rpcControllerFactory = rpcControllerFactory; this.location = location; this.tableName = tableName; this.row = row; + this.replicaId = replicaId; } @Override @@ -85,7 +103,18 @@ public abstract class RegionAdminServiceCallable implements RetryingCallable< this.stub = stub; } - public abstract HRegionLocation getLocation(boolean useCache) throws IOException; + public HRegionLocation getLocation(boolean useCache) throws IOException { + RegionLocations rl = getRegionLocations(connection, tableName, row, useCache, replicaId); + if (rl == null) { + throw new HBaseIOException(getExceptionMessage()); + } + HRegionLocation location = rl.getRegionLocation(replicaId); + if (location == null) { + throw new HBaseIOException(getExceptionMessage()); + } + + return location; + } @Override public void throwable(Throwable t, boolean retrying) { @@ -115,7 +144,8 @@ public abstract class RegionAdminServiceCallable implements RetryingCallable< //subclasses can override this. protected String getExceptionMessage() { - return "There is no location"; + return "There is no location" + " table=" + tableName + + " ,replica=" + replicaId + ", row=" + Bytes.toStringBinary(row); } @Override @@ -132,4 +162,27 @@ public abstract class RegionAdminServiceCallable implements RetryingCallable< } return sleep; } + + public static RegionLocations getRegionLocations( + ClusterConnection connection, TableName tableName, byte[] row, + boolean useCache, int replicaId) + throws RetriesExhaustedException, DoNotRetryIOException, InterruptedIOException { + RegionLocations rl; + try { + rl = connection.locateRegion(tableName, row, useCache, true, replicaId); + } catch (DoNotRetryIOException e) { + throw e; + } catch (RetriesExhaustedException e) { + throw e; + } catch (InterruptedIOException e) { + throw e; + } catch (IOException e) { + throw new RetriesExhaustedException("Can't get the location", e); + } + if (rl == null) { + throw new RetriesExhaustedException("Can't get the locations"); + } + + return rl; + } } diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/executor/EventType.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/executor/EventType.java index 9764efdc70d..ac76edbdeae 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/executor/EventType.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/executor/EventType.java @@ -37,22 +37,22 @@ public enum EventType { // Messages originating from RS (NOTE: there is NO direct communication from // RS to Master). These are a result of RS updates into ZK. // RS_ZK_REGION_CLOSING (1), // It is replaced by M_ZK_REGION_CLOSING(HBASE-4739) - + /** * RS_ZK_REGION_CLOSED
- * + * * RS has finished closing a region. */ RS_ZK_REGION_CLOSED (2, ExecutorType.MASTER_CLOSE_REGION), /** * RS_ZK_REGION_OPENING
- * + * * RS is in process of opening a region. */ RS_ZK_REGION_OPENING (3, null), /** * RS_ZK_REGION_OPENED
- * + * * RS has finished opening a region. */ RS_ZK_REGION_OPENED (4, ExecutorType.MASTER_OPEN_REGION), @@ -70,7 +70,7 @@ public enum EventType { RS_ZK_REGION_SPLIT (6, ExecutorType.MASTER_SERVER_OPERATIONS), /** * RS_ZK_REGION_FAILED_OPEN
- * + * * RS failed to open a region. */ RS_ZK_REGION_FAILED_OPEN (7, ExecutorType.MASTER_CLOSE_REGION), @@ -217,7 +217,7 @@ public enum EventType { * Master adds this region as closing in ZK */ M_ZK_REGION_CLOSING (51, null), - + /** * Master controlled events to be executed on the master * M_SERVER_SHUTDOWN @@ -232,14 +232,14 @@ public enum EventType { M_META_SERVER_SHUTDOWN (72, ExecutorType.MASTER_META_SERVER_OPERATIONS), /** * Master controlled events to be executed on the master.
- * + * * M_MASTER_RECOVERY
* Master is processing recovery of regions found in ZK RIT */ M_MASTER_RECOVERY (73, ExecutorType.MASTER_SERVER_OPERATIONS), /** * Master controlled events to be executed on the master.
- * + * * M_LOG_REPLAY
* Master is processing log replay of failed region server */ @@ -247,18 +247,25 @@ public enum EventType { /** * RS controlled events to be executed on the RS.
- * + * * RS_PARALLEL_SEEK */ RS_PARALLEL_SEEK (80, ExecutorType.RS_PARALLEL_SEEK), - + /** * RS wal recovery work items(either creating recover.edits or directly replay wals) * to be executed on the RS.
- * + * * RS_LOG_REPLAY */ - RS_LOG_REPLAY (81, ExecutorType.RS_LOG_REPLAY_OPS); + RS_LOG_REPLAY (81, ExecutorType.RS_LOG_REPLAY_OPS), + + /** + * RS flush triggering from secondary region replicas to primary region replica.
+ * + * RS_REGION_REPLICA_FLUSH + */ + RS_REGION_REPLICA_FLUSH (82, ExecutorType.RS_REGION_REPLICA_FLUSH_OPS); private final int code; private final ExecutorType executor; diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/executor/ExecutorType.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/executor/ExecutorType.java index 5590b0a782c..d0f6beedbb6 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/executor/ExecutorType.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/executor/ExecutorType.java @@ -45,7 +45,8 @@ public enum ExecutorType { RS_CLOSE_ROOT (24), RS_CLOSE_META (25), RS_PARALLEL_SEEK (26), - RS_LOG_REPLAY_OPS (27); + RS_LOG_REPLAY_OPS (27), + RS_REGION_REPLICA_FLUSH_OPS (28); ExecutorType(int value) {} diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/protobuf/RequestConverter.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/protobuf/RequestConverter.java index d23aa023464..508cf39e632 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/protobuf/RequestConverter.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/protobuf/RequestConverter.java @@ -739,10 +739,22 @@ public final class RequestConverter { */ public static FlushRegionRequest buildFlushRegionRequest(final byte[] regionName) { + return buildFlushRegionRequest(regionName, false); + } + + /** + * Create a protocol buffer FlushRegionRequest for a given region name + * + * @param regionName the name of the region to get info + * @return a protocol buffer FlushRegionRequest + */ + public static FlushRegionRequest + buildFlushRegionRequest(final byte[] regionName, boolean writeFlushWALMarker) { FlushRegionRequest.Builder builder = FlushRegionRequest.newBuilder(); RegionSpecifier region = buildRegionSpecifier( RegionSpecifierType.REGION_NAME, regionName); builder.setRegion(region); + builder.setWriteFlushWalMarker(writeFlushWALMarker); return builder.build(); } diff --git a/hbase-common/src/main/java/org/apache/hadoop/hbase/util/RetryCounter.java b/hbase-common/src/main/java/org/apache/hadoop/hbase/util/RetryCounter.java index c88cae335f6..73512fa21bb 100644 --- a/hbase-common/src/main/java/org/apache/hadoop/hbase/util/RetryCounter.java +++ b/hbase-common/src/main/java/org/apache/hadoop/hbase/util/RetryCounter.java @@ -152,7 +152,9 @@ public class RetryCounter { public void sleepUntilNextRetry() throws InterruptedException { int attempts = getAttemptTimes(); long sleepTime = retryConfig.backoffPolicy.getBackoffTime(retryConfig, attempts); - LOG.info("Sleeping " + sleepTime + "ms before retry #" + attempts + "..."); + if (LOG.isTraceEnabled()) { + LOG.trace("Sleeping " + sleepTime + "ms before retry #" + attempts + "..."); + } retryConfig.getTimeUnit().sleep(sleepTime); useRetry(); } diff --git a/hbase-it/src/test/java/org/apache/hadoop/hbase/IntegrationTestRegionReplicaReplication.java b/hbase-it/src/test/java/org/apache/hadoop/hbase/IntegrationTestRegionReplicaReplication.java index 30da5c01600..33b2554cc71 100644 --- a/hbase-it/src/test/java/org/apache/hadoop/hbase/IntegrationTestRegionReplicaReplication.java +++ b/hbase-it/src/test/java/org/apache/hadoop/hbase/IntegrationTestRegionReplicaReplication.java @@ -112,11 +112,6 @@ public class IntegrationTestRegionReplicaReplication extends IntegrationTestInge runIngestTest(JUNIT_RUN_TIME, 25000, 10, 1024, 10, 20); } - @Override - protected void startMonkey() throws Exception { - // TODO: disabled for now - } - /** * This extends MultiThreadedWriter to add a configurable delay to the keys written by the writer * threads to become available to the MultiThradedReader threads. We add this delay because of diff --git a/hbase-it/src/test/java/org/apache/hadoop/hbase/chaos/actions/RemoveColumnAction.java b/hbase-it/src/test/java/org/apache/hadoop/hbase/chaos/actions/RemoveColumnAction.java index efb44134503..c083d9c12a4 100644 --- a/hbase-it/src/test/java/org/apache/hadoop/hbase/chaos/actions/RemoveColumnAction.java +++ b/hbase-it/src/test/java/org/apache/hadoop/hbase/chaos/actions/RemoveColumnAction.java @@ -54,7 +54,7 @@ public class RemoveColumnAction extends Action { HTableDescriptor tableDescriptor = admin.getTableDescriptor(tableName); HColumnDescriptor[] columnDescriptors = tableDescriptor.getColumnFamilies(); - if (columnDescriptors.length <= 1) { + if (columnDescriptors.length <= (protectedColumns == null ? 1 : protectedColumns.size())) { return; } diff --git a/hbase-protocol/src/main/java/org/apache/hadoop/hbase/protobuf/generated/AdminProtos.java b/hbase-protocol/src/main/java/org/apache/hadoop/hbase/protobuf/generated/AdminProtos.java index 382874289df..ea022b5a84e 100644 --- a/hbase-protocol/src/main/java/org/apache/hadoop/hbase/protobuf/generated/AdminProtos.java +++ b/hbase-protocol/src/main/java/org/apache/hadoop/hbase/protobuf/generated/AdminProtos.java @@ -7996,6 +7996,24 @@ public final class AdminProtos { * optional uint64 if_older_than_ts = 2; */ long getIfOlderThanTs(); + + // optional bool write_flush_wal_marker = 3; + /** + * optional bool write_flush_wal_marker = 3; + * + *
+     * whether to write a marker to WAL even if not flushed
+     * 
+ */ + boolean hasWriteFlushWalMarker(); + /** + * optional bool write_flush_wal_marker = 3; + * + *
+     * whether to write a marker to WAL even if not flushed
+     * 
+ */ + boolean getWriteFlushWalMarker(); } /** * Protobuf type {@code FlushRegionRequest} @@ -8073,6 +8091,11 @@ public final class AdminProtos { ifOlderThanTs_ = input.readUInt64(); break; } + case 24: { + bitField0_ |= 0x00000004; + writeFlushWalMarker_ = input.readBool(); + break; + } } } } catch (com.google.protobuf.InvalidProtocolBufferException e) { @@ -8151,9 +8174,34 @@ public final class AdminProtos { return ifOlderThanTs_; } + // optional bool write_flush_wal_marker = 3; + public static final int WRITE_FLUSH_WAL_MARKER_FIELD_NUMBER = 3; + private boolean writeFlushWalMarker_; + /** + * optional bool write_flush_wal_marker = 3; + * + *
+     * whether to write a marker to WAL even if not flushed
+     * 
+ */ + public boolean hasWriteFlushWalMarker() { + return ((bitField0_ & 0x00000004) == 0x00000004); + } + /** + * optional bool write_flush_wal_marker = 3; + * + *
+     * whether to write a marker to WAL even if not flushed
+     * 
+ */ + public boolean getWriteFlushWalMarker() { + return writeFlushWalMarker_; + } + private void initFields() { region_ = org.apache.hadoop.hbase.protobuf.generated.HBaseProtos.RegionSpecifier.getDefaultInstance(); ifOlderThanTs_ = 0L; + writeFlushWalMarker_ = false; } private byte memoizedIsInitialized = -1; public final boolean isInitialized() { @@ -8181,6 +8229,9 @@ public final class AdminProtos { if (((bitField0_ & 0x00000002) == 0x00000002)) { output.writeUInt64(2, ifOlderThanTs_); } + if (((bitField0_ & 0x00000004) == 0x00000004)) { + output.writeBool(3, writeFlushWalMarker_); + } getUnknownFields().writeTo(output); } @@ -8198,6 +8249,10 @@ public final class AdminProtos { size += com.google.protobuf.CodedOutputStream .computeUInt64Size(2, ifOlderThanTs_); } + if (((bitField0_ & 0x00000004) == 0x00000004)) { + size += com.google.protobuf.CodedOutputStream + .computeBoolSize(3, writeFlushWalMarker_); + } size += getUnknownFields().getSerializedSize(); memoizedSerializedSize = size; return size; @@ -8231,6 +8286,11 @@ public final class AdminProtos { result = result && (getIfOlderThanTs() == other.getIfOlderThanTs()); } + result = result && (hasWriteFlushWalMarker() == other.hasWriteFlushWalMarker()); + if (hasWriteFlushWalMarker()) { + result = result && (getWriteFlushWalMarker() + == other.getWriteFlushWalMarker()); + } result = result && getUnknownFields().equals(other.getUnknownFields()); return result; @@ -8252,6 +8312,10 @@ public final class AdminProtos { hash = (37 * hash) + IF_OLDER_THAN_TS_FIELD_NUMBER; hash = (53 * hash) + hashLong(getIfOlderThanTs()); } + if (hasWriteFlushWalMarker()) { + hash = (37 * hash) + WRITE_FLUSH_WAL_MARKER_FIELD_NUMBER; + hash = (53 * hash) + hashBoolean(getWriteFlushWalMarker()); + } hash = (29 * hash) + getUnknownFields().hashCode(); memoizedHashCode = hash; return hash; @@ -8377,6 +8441,8 @@ public final class AdminProtos { bitField0_ = (bitField0_ & ~0x00000001); ifOlderThanTs_ = 0L; bitField0_ = (bitField0_ & ~0x00000002); + writeFlushWalMarker_ = false; + bitField0_ = (bitField0_ & ~0x00000004); return this; } @@ -8417,6 +8483,10 @@ public final class AdminProtos { to_bitField0_ |= 0x00000002; } result.ifOlderThanTs_ = ifOlderThanTs_; + if (((from_bitField0_ & 0x00000004) == 0x00000004)) { + to_bitField0_ |= 0x00000004; + } + result.writeFlushWalMarker_ = writeFlushWalMarker_; result.bitField0_ = to_bitField0_; onBuilt(); return result; @@ -8439,6 +8509,9 @@ public final class AdminProtos { if (other.hasIfOlderThanTs()) { setIfOlderThanTs(other.getIfOlderThanTs()); } + if (other.hasWriteFlushWalMarker()) { + setWriteFlushWalMarker(other.getWriteFlushWalMarker()); + } this.mergeUnknownFields(other.getUnknownFields()); return this; } @@ -8624,6 +8697,55 @@ public final class AdminProtos { return this; } + // optional bool write_flush_wal_marker = 3; + private boolean writeFlushWalMarker_ ; + /** + * optional bool write_flush_wal_marker = 3; + * + *
+       * whether to write a marker to WAL even if not flushed
+       * 
+ */ + public boolean hasWriteFlushWalMarker() { + return ((bitField0_ & 0x00000004) == 0x00000004); + } + /** + * optional bool write_flush_wal_marker = 3; + * + *
+       * whether to write a marker to WAL even if not flushed
+       * 
+ */ + public boolean getWriteFlushWalMarker() { + return writeFlushWalMarker_; + } + /** + * optional bool write_flush_wal_marker = 3; + * + *
+       * whether to write a marker to WAL even if not flushed
+       * 
+ */ + public Builder setWriteFlushWalMarker(boolean value) { + bitField0_ |= 0x00000004; + writeFlushWalMarker_ = value; + onChanged(); + return this; + } + /** + * optional bool write_flush_wal_marker = 3; + * + *
+       * whether to write a marker to WAL even if not flushed
+       * 
+ */ + public Builder clearWriteFlushWalMarker() { + bitField0_ = (bitField0_ & ~0x00000004); + writeFlushWalMarker_ = false; + onChanged(); + return this; + } + // @@protoc_insertion_point(builder_scope:FlushRegionRequest) } @@ -8657,6 +8779,16 @@ public final class AdminProtos { * optional bool flushed = 2; */ boolean getFlushed(); + + // optional bool wrote_flush_wal_marker = 3; + /** + * optional bool wrote_flush_wal_marker = 3; + */ + boolean hasWroteFlushWalMarker(); + /** + * optional bool wrote_flush_wal_marker = 3; + */ + boolean getWroteFlushWalMarker(); } /** * Protobuf type {@code FlushRegionResponse} @@ -8719,6 +8851,11 @@ public final class AdminProtos { flushed_ = input.readBool(); break; } + case 24: { + bitField0_ |= 0x00000004; + wroteFlushWalMarker_ = input.readBool(); + break; + } } } } catch (com.google.protobuf.InvalidProtocolBufferException e) { @@ -8791,9 +8928,26 @@ public final class AdminProtos { return flushed_; } + // optional bool wrote_flush_wal_marker = 3; + public static final int WROTE_FLUSH_WAL_MARKER_FIELD_NUMBER = 3; + private boolean wroteFlushWalMarker_; + /** + * optional bool wrote_flush_wal_marker = 3; + */ + public boolean hasWroteFlushWalMarker() { + return ((bitField0_ & 0x00000004) == 0x00000004); + } + /** + * optional bool wrote_flush_wal_marker = 3; + */ + public boolean getWroteFlushWalMarker() { + return wroteFlushWalMarker_; + } + private void initFields() { lastFlushTime_ = 0L; flushed_ = false; + wroteFlushWalMarker_ = false; } private byte memoizedIsInitialized = -1; public final boolean isInitialized() { @@ -8817,6 +8971,9 @@ public final class AdminProtos { if (((bitField0_ & 0x00000002) == 0x00000002)) { output.writeBool(2, flushed_); } + if (((bitField0_ & 0x00000004) == 0x00000004)) { + output.writeBool(3, wroteFlushWalMarker_); + } getUnknownFields().writeTo(output); } @@ -8834,6 +8991,10 @@ public final class AdminProtos { size += com.google.protobuf.CodedOutputStream .computeBoolSize(2, flushed_); } + if (((bitField0_ & 0x00000004) == 0x00000004)) { + size += com.google.protobuf.CodedOutputStream + .computeBoolSize(3, wroteFlushWalMarker_); + } size += getUnknownFields().getSerializedSize(); memoizedSerializedSize = size; return size; @@ -8867,6 +9028,11 @@ public final class AdminProtos { result = result && (getFlushed() == other.getFlushed()); } + result = result && (hasWroteFlushWalMarker() == other.hasWroteFlushWalMarker()); + if (hasWroteFlushWalMarker()) { + result = result && (getWroteFlushWalMarker() + == other.getWroteFlushWalMarker()); + } result = result && getUnknownFields().equals(other.getUnknownFields()); return result; @@ -8888,6 +9054,10 @@ public final class AdminProtos { hash = (37 * hash) + FLUSHED_FIELD_NUMBER; hash = (53 * hash) + hashBoolean(getFlushed()); } + if (hasWroteFlushWalMarker()) { + hash = (37 * hash) + WROTE_FLUSH_WAL_MARKER_FIELD_NUMBER; + hash = (53 * hash) + hashBoolean(getWroteFlushWalMarker()); + } hash = (29 * hash) + getUnknownFields().hashCode(); memoizedHashCode = hash; return hash; @@ -9001,6 +9171,8 @@ public final class AdminProtos { bitField0_ = (bitField0_ & ~0x00000001); flushed_ = false; bitField0_ = (bitField0_ & ~0x00000002); + wroteFlushWalMarker_ = false; + bitField0_ = (bitField0_ & ~0x00000004); return this; } @@ -9037,6 +9209,10 @@ public final class AdminProtos { to_bitField0_ |= 0x00000002; } result.flushed_ = flushed_; + if (((from_bitField0_ & 0x00000004) == 0x00000004)) { + to_bitField0_ |= 0x00000004; + } + result.wroteFlushWalMarker_ = wroteFlushWalMarker_; result.bitField0_ = to_bitField0_; onBuilt(); return result; @@ -9059,6 +9235,9 @@ public final class AdminProtos { if (other.hasFlushed()) { setFlushed(other.getFlushed()); } + if (other.hasWroteFlushWalMarker()) { + setWroteFlushWalMarker(other.getWroteFlushWalMarker()); + } this.mergeUnknownFields(other.getUnknownFields()); return this; } @@ -9156,6 +9335,39 @@ public final class AdminProtos { return this; } + // optional bool wrote_flush_wal_marker = 3; + private boolean wroteFlushWalMarker_ ; + /** + * optional bool wrote_flush_wal_marker = 3; + */ + public boolean hasWroteFlushWalMarker() { + return ((bitField0_ & 0x00000004) == 0x00000004); + } + /** + * optional bool wrote_flush_wal_marker = 3; + */ + public boolean getWroteFlushWalMarker() { + return wroteFlushWalMarker_; + } + /** + * optional bool wrote_flush_wal_marker = 3; + */ + public Builder setWroteFlushWalMarker(boolean value) { + bitField0_ |= 0x00000004; + wroteFlushWalMarker_ = value; + onChanged(); + return this; + } + /** + * optional bool wrote_flush_wal_marker = 3; + */ + public Builder clearWroteFlushWalMarker() { + bitField0_ = (bitField0_ & ~0x00000004); + wroteFlushWalMarker_ = false; + onChanged(); + return this; + } + // @@protoc_insertion_point(builder_scope:FlushRegionResponse) } @@ -22073,66 +22285,67 @@ public final class AdminProtos { "n_ZK\030\003 \001(\010:\004true\022\'\n\022destination_server\030\004" + " \001(\0132\013.ServerName\022\027\n\017serverStartCode\030\005 \001" + "(\004\"%\n\023CloseRegionResponse\022\016\n\006closed\030\001 \002(", - "\010\"P\n\022FlushRegionRequest\022 \n\006region\030\001 \002(\0132" + + "\010\"p\n\022FlushRegionRequest\022 \n\006region\030\001 \002(\0132" + "\020.RegionSpecifier\022\030\n\020if_older_than_ts\030\002 " + - "\001(\004\"?\n\023FlushRegionResponse\022\027\n\017last_flush" + - "_time\030\001 \002(\004\022\017\n\007flushed\030\002 \001(\010\"K\n\022SplitReg" + - "ionRequest\022 \n\006region\030\001 \002(\0132\020.RegionSpeci" + - "fier\022\023\n\013split_point\030\002 \001(\014\"\025\n\023SplitRegion" + - "Response\"W\n\024CompactRegionRequest\022 \n\006regi" + - "on\030\001 \002(\0132\020.RegionSpecifier\022\r\n\005major\030\002 \001(" + - "\010\022\016\n\006family\030\003 \001(\014\"\027\n\025CompactRegionRespon" + - "se\"\262\001\n\031UpdateFavoredNodesRequest\022@\n\013upda", - "te_info\030\001 \003(\0132+.UpdateFavoredNodesReques" + - "t.RegionUpdateInfo\032S\n\020RegionUpdateInfo\022\033" + - "\n\006region\030\001 \002(\0132\013.RegionInfo\022\"\n\rfavored_n" + - "odes\030\002 \003(\0132\013.ServerName\".\n\032UpdateFavored" + - "NodesResponse\022\020\n\010response\030\001 \001(\r\"v\n\023Merge" + - "RegionsRequest\022\"\n\010region_a\030\001 \002(\0132\020.Regio" + - "nSpecifier\022\"\n\010region_b\030\002 \002(\0132\020.RegionSpe" + - "cifier\022\027\n\010forcible\030\003 \001(\010:\005false\"\026\n\024Merge" + - "RegionsResponse\"X\n\010WALEntry\022\024\n\003key\030\001 \002(\013" + - "2\007.WALKey\022\027\n\017key_value_bytes\030\002 \003(\014\022\035\n\025as", - "sociated_cell_count\030\003 \001(\005\"4\n\030ReplicateWA" + - "LEntryRequest\022\030\n\005entry\030\001 \003(\0132\t.WALEntry\"" + - "\033\n\031ReplicateWALEntryResponse\"\026\n\024RollWALW" + - "riterRequest\"0\n\025RollWALWriterResponse\022\027\n" + - "\017region_to_flush\030\001 \003(\014\"#\n\021StopServerRequ" + - "est\022\016\n\006reason\030\001 \002(\t\"\024\n\022StopServerRespons" + - "e\"\026\n\024GetServerInfoRequest\"B\n\nServerInfo\022" + - " \n\013server_name\030\001 \002(\0132\013.ServerName\022\022\n\nweb" + - "ui_port\030\002 \001(\r\"9\n\025GetServerInfoResponse\022 " + - "\n\013server_info\030\001 \002(\0132\013.ServerInfo\"\034\n\032Upda", - "teConfigurationRequest\"\035\n\033UpdateConfigur" + - "ationResponse2\230\010\n\014AdminService\022>\n\rGetReg" + - "ionInfo\022\025.GetRegionInfoRequest\032\026.GetRegi" + - "onInfoResponse\022;\n\014GetStoreFile\022\024.GetStor" + - "eFileRequest\032\025.GetStoreFileResponse\022D\n\017G" + - "etOnlineRegion\022\027.GetOnlineRegionRequest\032" + - "\030.GetOnlineRegionResponse\0225\n\nOpenRegion\022" + - "\022.OpenRegionRequest\032\023.OpenRegionResponse" + - "\0228\n\013CloseRegion\022\023.CloseRegionRequest\032\024.C" + - "loseRegionResponse\0228\n\013FlushRegion\022\023.Flus", - "hRegionRequest\032\024.FlushRegionResponse\0228\n\013" + - "SplitRegion\022\023.SplitRegionRequest\032\024.Split" + - "RegionResponse\022>\n\rCompactRegion\022\025.Compac" + - "tRegionRequest\032\026.CompactRegionResponse\022;" + - "\n\014MergeRegions\022\024.MergeRegionsRequest\032\025.M" + - "ergeRegionsResponse\022J\n\021ReplicateWALEntry" + - "\022\031.ReplicateWALEntryRequest\032\032.ReplicateW" + - "ALEntryResponse\022?\n\006Replay\022\031.ReplicateWAL" + - "EntryRequest\032\032.ReplicateWALEntryResponse" + - "\022>\n\rRollWALWriter\022\025.RollWALWriterRequest", - "\032\026.RollWALWriterResponse\022>\n\rGetServerInf" + - "o\022\025.GetServerInfoRequest\032\026.GetServerInfo" + - "Response\0225\n\nStopServer\022\022.StopServerReque" + - "st\032\023.StopServerResponse\022M\n\022UpdateFavored" + - "Nodes\022\032.UpdateFavoredNodesRequest\032\033.Upda" + - "teFavoredNodesResponse\022P\n\023UpdateConfigur" + - "ation\022\033.UpdateConfigurationRequest\032\034.Upd" + - "ateConfigurationResponseBA\n*org.apache.h" + - "adoop.hbase.protobuf.generatedB\013AdminPro" + - "tosH\001\210\001\001\240\001\001" + "\001(\004\022\036\n\026write_flush_wal_marker\030\003 \001(\010\"_\n\023F" + + "lushRegionResponse\022\027\n\017last_flush_time\030\001 " + + "\002(\004\022\017\n\007flushed\030\002 \001(\010\022\036\n\026wrote_flush_wal_" + + "marker\030\003 \001(\010\"K\n\022SplitRegionRequest\022 \n\006re" + + "gion\030\001 \002(\0132\020.RegionSpecifier\022\023\n\013split_po" + + "int\030\002 \001(\014\"\025\n\023SplitRegionResponse\"W\n\024Comp" + + "actRegionRequest\022 \n\006region\030\001 \002(\0132\020.Regio" + + "nSpecifier\022\r\n\005major\030\002 \001(\010\022\016\n\006family\030\003 \001(", + "\014\"\027\n\025CompactRegionResponse\"\262\001\n\031UpdateFav" + + "oredNodesRequest\022@\n\013update_info\030\001 \003(\0132+." + + "UpdateFavoredNodesRequest.RegionUpdateIn" + + "fo\032S\n\020RegionUpdateInfo\022\033\n\006region\030\001 \002(\0132\013" + + ".RegionInfo\022\"\n\rfavored_nodes\030\002 \003(\0132\013.Ser" + + "verName\".\n\032UpdateFavoredNodesResponse\022\020\n" + + "\010response\030\001 \001(\r\"v\n\023MergeRegionsRequest\022\"" + + "\n\010region_a\030\001 \002(\0132\020.RegionSpecifier\022\"\n\010re" + + "gion_b\030\002 \002(\0132\020.RegionSpecifier\022\027\n\010forcib" + + "le\030\003 \001(\010:\005false\"\026\n\024MergeRegionsResponse\"", + "X\n\010WALEntry\022\024\n\003key\030\001 \002(\0132\007.WALKey\022\027\n\017key" + + "_value_bytes\030\002 \003(\014\022\035\n\025associated_cell_co" + + "unt\030\003 \001(\005\"4\n\030ReplicateWALEntryRequest\022\030\n" + + "\005entry\030\001 \003(\0132\t.WALEntry\"\033\n\031ReplicateWALE" + + "ntryResponse\"\026\n\024RollWALWriterRequest\"0\n\025" + + "RollWALWriterResponse\022\027\n\017region_to_flush" + + "\030\001 \003(\014\"#\n\021StopServerRequest\022\016\n\006reason\030\001 " + + "\002(\t\"\024\n\022StopServerResponse\"\026\n\024GetServerIn" + + "foRequest\"B\n\nServerInfo\022 \n\013server_name\030\001" + + " \002(\0132\013.ServerName\022\022\n\nwebui_port\030\002 \001(\r\"9\n", + "\025GetServerInfoResponse\022 \n\013server_info\030\001 " + + "\002(\0132\013.ServerInfo\"\034\n\032UpdateConfigurationR" + + "equest\"\035\n\033UpdateConfigurationResponse2\230\010" + + "\n\014AdminService\022>\n\rGetRegionInfo\022\025.GetReg" + + "ionInfoRequest\032\026.GetRegionInfoResponse\022;" + + "\n\014GetStoreFile\022\024.GetStoreFileRequest\032\025.G" + + "etStoreFileResponse\022D\n\017GetOnlineRegion\022\027" + + ".GetOnlineRegionRequest\032\030.GetOnlineRegio" + + "nResponse\0225\n\nOpenRegion\022\022.OpenRegionRequ" + + "est\032\023.OpenRegionResponse\0228\n\013CloseRegion\022", + "\023.CloseRegionRequest\032\024.CloseRegionRespon" + + "se\0228\n\013FlushRegion\022\023.FlushRegionRequest\032\024" + + ".FlushRegionResponse\0228\n\013SplitRegion\022\023.Sp" + + "litRegionRequest\032\024.SplitRegionResponse\022>" + + "\n\rCompactRegion\022\025.CompactRegionRequest\032\026" + + ".CompactRegionResponse\022;\n\014MergeRegions\022\024" + + ".MergeRegionsRequest\032\025.MergeRegionsRespo" + + "nse\022J\n\021ReplicateWALEntry\022\031.ReplicateWALE" + + "ntryRequest\032\032.ReplicateWALEntryResponse\022" + + "?\n\006Replay\022\031.ReplicateWALEntryRequest\032\032.R", + "eplicateWALEntryResponse\022>\n\rRollWALWrite" + + "r\022\025.RollWALWriterRequest\032\026.RollWALWriter" + + "Response\022>\n\rGetServerInfo\022\025.GetServerInf" + + "oRequest\032\026.GetServerInfoResponse\0225\n\nStop" + + "Server\022\022.StopServerRequest\032\023.StopServerR" + + "esponse\022M\n\022UpdateFavoredNodes\022\032.UpdateFa" + + "voredNodesRequest\032\033.UpdateFavoredNodesRe" + + "sponse\022P\n\023UpdateConfiguration\022\033.UpdateCo" + + "nfigurationRequest\032\034.UpdateConfiguration" + + "ResponseBA\n*org.apache.hadoop.hbase.prot", + "obuf.generatedB\013AdminProtosH\001\210\001\001\240\001\001" }; com.google.protobuf.Descriptors.FileDescriptor.InternalDescriptorAssigner assigner = new com.google.protobuf.Descriptors.FileDescriptor.InternalDescriptorAssigner() { @@ -22210,13 +22423,13 @@ public final class AdminProtos { internal_static_FlushRegionRequest_fieldAccessorTable = new com.google.protobuf.GeneratedMessage.FieldAccessorTable( internal_static_FlushRegionRequest_descriptor, - new java.lang.String[] { "Region", "IfOlderThanTs", }); + new java.lang.String[] { "Region", "IfOlderThanTs", "WriteFlushWalMarker", }); internal_static_FlushRegionResponse_descriptor = getDescriptor().getMessageTypes().get(11); internal_static_FlushRegionResponse_fieldAccessorTable = new com.google.protobuf.GeneratedMessage.FieldAccessorTable( internal_static_FlushRegionResponse_descriptor, - new java.lang.String[] { "LastFlushTime", "Flushed", }); + new java.lang.String[] { "LastFlushTime", "Flushed", "WroteFlushWalMarker", }); internal_static_SplitRegionRequest_descriptor = getDescriptor().getMessageTypes().get(12); internal_static_SplitRegionRequest_fieldAccessorTable = new diff --git a/hbase-protocol/src/main/java/org/apache/hadoop/hbase/protobuf/generated/WALProtos.java b/hbase-protocol/src/main/java/org/apache/hadoop/hbase/protobuf/generated/WALProtos.java index 35192cccad6..fa73077498a 100644 --- a/hbase-protocol/src/main/java/org/apache/hadoop/hbase/protobuf/generated/WALProtos.java +++ b/hbase-protocol/src/main/java/org/apache/hadoop/hbase/protobuf/generated/WALProtos.java @@ -5695,6 +5695,14 @@ public final class WALProtos { * ABORT_FLUSH = 2; */ ABORT_FLUSH(2, 2), + /** + * CANNOT_FLUSH = 3; + * + *
+       * marker for indicating that a flush has been requested but cannot complete
+       * 
+ */ + CANNOT_FLUSH(3, 3), ; /** @@ -5709,6 +5717,14 @@ public final class WALProtos { * ABORT_FLUSH = 2; */ public static final int ABORT_FLUSH_VALUE = 2; + /** + * CANNOT_FLUSH = 3; + * + *
+       * marker for indicating that a flush has been requested but cannot complete
+       * 
+ */ + public static final int CANNOT_FLUSH_VALUE = 3; public final int getNumber() { return value; } @@ -5718,6 +5734,7 @@ public final class WALProtos { case 0: return START_FLUSH; case 1: return COMMIT_FLUSH; case 2: return ABORT_FLUSH; + case 3: return CANNOT_FLUSH; default: return null; } } @@ -11848,7 +11865,7 @@ public final class WALProtos { "n_name\030\002 \002(\014\022\023\n\013family_name\030\003 \002(\014\022\030\n\020com" + "paction_input\030\004 \003(\t\022\031\n\021compaction_output" + "\030\005 \003(\t\022\026\n\016store_home_dir\030\006 \002(\t\022\023\n\013region" + - "_name\030\007 \001(\014\"\200\003\n\017FlushDescriptor\022,\n\006actio" + + "_name\030\007 \001(\014\"\222\003\n\017FlushDescriptor\022,\n\006actio" + "n\030\001 \002(\0162\034.FlushDescriptor.FlushAction\022\022\n", "\ntable_name\030\002 \002(\014\022\033\n\023encoded_region_name" + "\030\003 \002(\014\022\035\n\025flush_sequence_number\030\004 \001(\004\022<\n" + @@ -11856,25 +11873,26 @@ public final class WALProtos { "toreFlushDescriptor\022\023\n\013region_name\030\006 \001(\014" + "\032Y\n\024StoreFlushDescriptor\022\023\n\013family_name\030" + "\001 \002(\014\022\026\n\016store_home_dir\030\002 \002(\t\022\024\n\014flush_o" + - "utput\030\003 \003(\t\"A\n\013FlushAction\022\017\n\013START_FLUS" + - "H\020\000\022\020\n\014COMMIT_FLUSH\020\001\022\017\n\013ABORT_FLUSH\020\002\"R" + - "\n\017StoreDescriptor\022\023\n\013family_name\030\001 \002(\014\022\026" + - "\n\016store_home_dir\030\002 \002(\t\022\022\n\nstore_file\030\003 \003", - "(\t\"\215\001\n\022BulkLoadDescriptor\022\036\n\ntable_name\030" + - "\001 \002(\0132\n.TableName\022\033\n\023encoded_region_name" + - "\030\002 \002(\014\022 \n\006stores\030\003 \003(\0132\020.StoreDescriptor" + - "\022\030\n\020bulkload_seq_num\030\004 \002(\003\"\237\002\n\025RegionEve" + - "ntDescriptor\0224\n\nevent_type\030\001 \002(\0162 .Regio" + - "nEventDescriptor.EventType\022\022\n\ntable_name" + - "\030\002 \002(\014\022\033\n\023encoded_region_name\030\003 \002(\014\022\033\n\023l" + - "og_sequence_number\030\004 \001(\004\022 \n\006stores\030\005 \003(\013" + - "2\020.StoreDescriptor\022\033\n\006server\030\006 \001(\0132\013.Ser" + - "verName\022\023\n\013region_name\030\007 \001(\014\".\n\tEventTyp", - "e\022\017\n\013REGION_OPEN\020\000\022\020\n\014REGION_CLOSE\020\001\"\014\n\n" + - "WALTrailer*F\n\tScopeType\022\033\n\027REPLICATION_S" + - "COPE_LOCAL\020\000\022\034\n\030REPLICATION_SCOPE_GLOBAL" + - "\020\001B?\n*org.apache.hadoop.hbase.protobuf.g" + - "eneratedB\tWALProtosH\001\210\001\000\240\001\001" + "utput\030\003 \003(\t\"S\n\013FlushAction\022\017\n\013START_FLUS" + + "H\020\000\022\020\n\014COMMIT_FLUSH\020\001\022\017\n\013ABORT_FLUSH\020\002\022\020" + + "\n\014CANNOT_FLUSH\020\003\"R\n\017StoreDescriptor\022\023\n\013f" + + "amily_name\030\001 \002(\014\022\026\n\016store_home_dir\030\002 \002(\t", + "\022\022\n\nstore_file\030\003 \003(\t\"\215\001\n\022BulkLoadDescrip" + + "tor\022\036\n\ntable_name\030\001 \002(\0132\n.TableName\022\033\n\023e" + + "ncoded_region_name\030\002 \002(\014\022 \n\006stores\030\003 \003(\013" + + "2\020.StoreDescriptor\022\030\n\020bulkload_seq_num\030\004" + + " \002(\003\"\237\002\n\025RegionEventDescriptor\0224\n\nevent_" + + "type\030\001 \002(\0162 .RegionEventDescriptor.Event" + + "Type\022\022\n\ntable_name\030\002 \002(\014\022\033\n\023encoded_regi" + + "on_name\030\003 \002(\014\022\033\n\023log_sequence_number\030\004 \001" + + "(\004\022 \n\006stores\030\005 \003(\0132\020.StoreDescriptor\022\033\n\006" + + "server\030\006 \001(\0132\013.ServerName\022\023\n\013region_name", + "\030\007 \001(\014\".\n\tEventType\022\017\n\013REGION_OPEN\020\000\022\020\n\014" + + "REGION_CLOSE\020\001\"\014\n\nWALTrailer*F\n\tScopeTyp" + + "e\022\033\n\027REPLICATION_SCOPE_LOCAL\020\000\022\034\n\030REPLIC" + + "ATION_SCOPE_GLOBAL\020\001B?\n*org.apache.hadoo" + + "p.hbase.protobuf.generatedB\tWALProtosH\001\210" + + "\001\000\240\001\001" }; com.google.protobuf.Descriptors.FileDescriptor.InternalDescriptorAssigner assigner = new com.google.protobuf.Descriptors.FileDescriptor.InternalDescriptorAssigner() { diff --git a/hbase-protocol/src/main/protobuf/Admin.proto b/hbase-protocol/src/main/protobuf/Admin.proto index fcc4e1d0248..5f0572a540f 100644 --- a/hbase-protocol/src/main/protobuf/Admin.proto +++ b/hbase-protocol/src/main/protobuf/Admin.proto @@ -115,11 +115,13 @@ message CloseRegionResponse { message FlushRegionRequest { required RegionSpecifier region = 1; optional uint64 if_older_than_ts = 2; + optional bool write_flush_wal_marker = 3; // whether to write a marker to WAL even if not flushed } message FlushRegionResponse { required uint64 last_flush_time = 1; optional bool flushed = 2; + optional bool wrote_flush_wal_marker = 3; } /** diff --git a/hbase-protocol/src/main/protobuf/WAL.proto b/hbase-protocol/src/main/protobuf/WAL.proto index 3fd60255cba..9853e36467f 100644 --- a/hbase-protocol/src/main/protobuf/WAL.proto +++ b/hbase-protocol/src/main/protobuf/WAL.proto @@ -109,6 +109,7 @@ message FlushDescriptor { START_FLUSH = 0; COMMIT_FLUSH = 1; ABORT_FLUSH = 2; + CANNOT_FLUSH = 3; // marker for indicating that a flush has been requested but cannot complete } message StoreFlushDescriptor { diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java index 8f73af58aa9..ac1338294b0 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java @@ -480,6 +480,7 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver { // final Result result; final String failureReason; final long flushSequenceId; + final boolean wroteFlushWalMarker; /** * Convenience constructor to use when the flush is successful, the failure message is set to @@ -489,7 +490,7 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver { // * memstores. */ FlushResult(Result result, long flushSequenceId) { - this(result, flushSequenceId, null); + this(result, flushSequenceId, null, false); assert result == Result.FLUSHED_NO_COMPACTION_NEEDED || result == Result .FLUSHED_COMPACTION_NEEDED; } @@ -499,8 +500,8 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver { // * @param result Expecting CANNOT_FLUSH_MEMSTORE_EMPTY or CANNOT_FLUSH. * @param failureReason Reason why we couldn't flush. */ - FlushResult(Result result, String failureReason) { - this(result, -1, failureReason); + FlushResult(Result result, String failureReason, boolean wroteFlushMarker) { + this(result, -1, failureReason, wroteFlushMarker); assert result == Result.CANNOT_FLUSH_MEMSTORE_EMPTY || result == Result.CANNOT_FLUSH; } @@ -510,10 +511,12 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver { // * @param flushSequenceId Generated sequence id if the memstores were flushed else -1. * @param failureReason Reason why we couldn't flush, or null. */ - FlushResult(Result result, long flushSequenceId, String failureReason) { + FlushResult(Result result, long flushSequenceId, String failureReason, + boolean wroteFlushMarker) { this.result = result; this.flushSequenceId = flushSequenceId; this.failureReason = failureReason; + this.wroteFlushWalMarker = wroteFlushMarker; } /** @@ -1787,7 +1790,7 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver { // * @throws IOException */ public FlushResult flushcache() throws IOException { - return flushcache(true); + return flushcache(true, false); } /** @@ -1811,11 +1814,38 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver { // * because a Snapshot was not properly persisted. */ public FlushResult flushcache(boolean forceFlushAllStores) throws IOException { + return flushcache(forceFlushAllStores, false); + } + + + /** + * Flush the cache. + * + * When this method is called the cache will be flushed unless: + *
    + *
  1. the cache is empty
  2. + *
  3. the region is closed.
  4. + *
  5. a flush is already in progress
  6. + *
  7. writes are disabled
  8. + *
+ * + *

This method may block for some time, so it should not be called from a + * time-sensitive thread. + * @param forceFlushAllStores whether we want to flush all stores + * @param writeFlushRequestWalMarker whether to write the flush request marker to WAL + * @return whether the flush is success and whether the region needs compacting + * + * @throws IOException general io exceptions + * @throws DroppedSnapshotException Thrown when replay of wal is required + * because a Snapshot was not properly persisted. + */ + public FlushResult flushcache(boolean forceFlushAllStores, boolean writeFlushRequestWalMarker) + throws IOException { // fail-fast instead of waiting on the lock if (this.closing.get()) { String msg = "Skipping flush on " + this + " because closing"; LOG.debug(msg); - return new FlushResult(FlushResult.Result.CANNOT_FLUSH, msg); + return new FlushResult(FlushResult.Result.CANNOT_FLUSH, msg, false); } MonitoredTask status = TaskMonitor.get().createStatus("Flushing " + this); status.setStatus("Acquiring readlock on region"); @@ -1826,7 +1856,7 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver { // String msg = "Skipping flush on " + this + " because closed"; LOG.debug(msg); status.abort(msg); - return new FlushResult(FlushResult.Result.CANNOT_FLUSH, msg); + return new FlushResult(FlushResult.Result.CANNOT_FLUSH, msg, false); } if (coprocessorHost != null) { status.setStatus("Running coprocessor pre-flush hooks"); @@ -1851,14 +1881,15 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver { // + (writestate.flushing ? "already flushing" : "writes not enabled"); status.abort(msg); - return new FlushResult(FlushResult.Result.CANNOT_FLUSH, msg); + return new FlushResult(FlushResult.Result.CANNOT_FLUSH, msg, false); } } try { Collection specificStoresToFlush = forceFlushAllStores ? stores.values() : flushPolicy.selectStoresToFlush(); - FlushResult fs = internalFlushcache(specificStoresToFlush, status); + FlushResult fs = internalFlushcache(specificStoresToFlush, + status, writeFlushRequestWalMarker); if (coprocessorHost != null) { status.setStatus("Running post-flush coprocessor hooks"); @@ -1955,7 +1986,7 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver { // */ private FlushResult internalFlushcache(MonitoredTask status) throws IOException { - return internalFlushcache(stores.values(), status); + return internalFlushcache(stores.values(), status, false); } /** @@ -1964,9 +1995,9 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver { // * @see #internalFlushcache(WAL, long, Collection, MonitoredTask) */ private FlushResult internalFlushcache(final Collection storesToFlush, - MonitoredTask status) throws IOException { + MonitoredTask status, boolean writeFlushWalMarker) throws IOException { return internalFlushcache(this.wal, HConstants.NO_SEQNUM, storesToFlush, - status); + status, writeFlushWalMarker); } /** @@ -1998,9 +2029,10 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver { // * properly persisted. */ protected FlushResult internalFlushcache(final WAL wal, final long myseqid, - final Collection storesToFlush, MonitoredTask status) throws IOException { + final Collection storesToFlush, MonitoredTask status, boolean writeFlushWalMarker) + throws IOException { PrepareFlushResult result - = internalPrepareFlushCache(wal, myseqid, storesToFlush, status, false); + = internalPrepareFlushCache(wal, myseqid, storesToFlush, status, writeFlushWalMarker); if (result.result == null) { return internalFlushCacheAndCommit(wal, status, result, storesToFlush); } else { @@ -2010,7 +2042,7 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver { // protected PrepareFlushResult internalPrepareFlushCache( final WAL wal, final long myseqid, final Collection storesToFlush, - MonitoredTask status, boolean isReplay) + MonitoredTask status, boolean writeFlushWalMarker) throws IOException { if (this.rsServices != null && this.rsServices.isAborted()) { @@ -2036,14 +2068,16 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver { // w = mvcc.beginMemstoreInsert(); long flushSeqId = getNextSequenceId(wal); FlushResult flushResult = new FlushResult( - FlushResult.Result.CANNOT_FLUSH_MEMSTORE_EMPTY, flushSeqId, "Nothing to flush"); + FlushResult.Result.CANNOT_FLUSH_MEMSTORE_EMPTY, flushSeqId, "Nothing to flush", + writeFlushRequestMarkerToWAL(wal, writeFlushWalMarker)); w.setWriteNumber(flushSeqId); mvcc.waitForPreviousTransactionsComplete(w); w = null; return new PrepareFlushResult(flushResult, myseqid); } else { return new PrepareFlushResult( - new FlushResult(FlushResult.Result.CANNOT_FLUSH_MEMSTORE_EMPTY, "Nothing to flush"), + new FlushResult(FlushResult.Result.CANNOT_FLUSH_MEMSTORE_EMPTY, "Nothing to flush", + false), myseqid); } } @@ -2110,7 +2144,8 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver { // String msg = "Flush will not be started for [" + this.getRegionInfo().getEncodedName() + "] - because the WAL is closing."; status.setStatus(msg); - return new PrepareFlushResult(new FlushResult(FlushResult.Result.CANNOT_FLUSH, msg), + return new PrepareFlushResult( + new FlushResult(FlushResult.Result.CANNOT_FLUSH, msg, false), myseqid); } flushOpSeqId = getNextSequenceId(wal); @@ -2198,6 +2233,28 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver { // flushedSeqId, totalFlushableSizeOfFlushableStores); } + /** + * Writes a marker to WAL indicating a flush is requested but cannot be complete due to various + * reasons. Ignores exceptions from WAL. Returns whether the write succeeded. + * @param wal + * @return + */ + private boolean writeFlushRequestMarkerToWAL(WAL wal, boolean writeFlushWalMarker) { + if (writeFlushWalMarker && wal != null && !writestate.readOnly) { + FlushDescriptor desc = ProtobufUtil.toFlushDescriptor(FlushAction.CANNOT_FLUSH, + getRegionInfo(), -1, new TreeMap>()); + try { + WALUtil.writeFlushMarker(wal, this.htableDescriptor, getRegionInfo(), + desc, sequenceId, true); + return true; + } catch (IOException e) { + LOG.warn(getRegionInfo().getEncodedName() + " : " + + "Received exception while trying to write the flush request to wal", e); + } + } + return false; + } + protected FlushResult internalFlushCacheAndCommit( final WAL wal, MonitoredTask status, final PrepareFlushResult prepareResult, final Collection storesToFlush) @@ -2267,8 +2324,9 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver { // WALUtil.writeFlushMarker(wal, this.htableDescriptor, getRegionInfo(), desc, sequenceId, false); } catch (Throwable ex) { - LOG.warn("Received unexpected exception trying to write ABORT_FLUSH marker to WAL:" + - StringUtils.stringifyException(ex)); + LOG.warn(getRegionInfo().getEncodedName() + " : " + + "Received unexpected exception trying to write ABORT_FLUSH marker to WAL:" + + StringUtils.stringifyException(ex)); // ignore this since we will be aborting the RS with DSE. } wal.abortCacheFlush(this.getRegionInfo().getEncodedNameAsBytes()); @@ -3546,7 +3604,8 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver { // protected void checkReadsEnabled() throws IOException { if (!this.writestate.readsEnabled) { - throw new IOException("The region's reads are disabled. Cannot serve the request"); + throw new IOException(getRegionInfo().getEncodedName() + + ": The region's reads are disabled. Cannot serve the request"); } } @@ -3835,7 +3894,7 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver { // } if (seqid > minSeqIdForTheRegion) { // Then we added some edits to memory. Flush and cleanup split edit files. - internalFlushcache(null, seqid, stores.values(), status); + internalFlushcache(null, seqid, stores.values(), status, false); } // Now delete the content of recovered edits. We're done w/ them. if (files.size() > 0 && this.conf.getBoolean("hbase.region.archive.recovered.edits", false)) { @@ -3937,7 +3996,8 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver { // if (currentEditSeqId > key.getLogSeqNum()) { // when this condition is true, it means we have a serious defect because we need to // maintain increasing SeqId for WAL edits per region - LOG.error("Found decreasing SeqId. PreId=" + currentEditSeqId + " key=" + key + LOG.error(getRegionInfo().getEncodedName() + " : " + + "Found decreasing SeqId. PreId=" + currentEditSeqId + " key=" + key + "; edit=" + val); } else { currentEditSeqId = key.getLogSeqNum(); @@ -4001,7 +4061,7 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver { // editsCount++; } if (flush) { - internalFlushcache(null, currentEditSeqId, stores.values(), status); + internalFlushcache(null, currentEditSeqId, stores.values(), status, false); } if (coprocessorHost != null) { @@ -4060,18 +4120,25 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver { // "Compaction marker from WAL ", compaction); if (replaySeqId < lastReplayedOpenRegionSeqId) { - LOG.warn("Skipping replaying compaction event :" + TextFormat.shortDebugString(compaction) - + " because its sequence id is smaller than this regions lastReplayedOpenRegionSeqId " - + " of " + lastReplayedOpenRegionSeqId); + LOG.warn(getRegionInfo().getEncodedName() + " : " + + "Skipping replaying compaction event :" + TextFormat.shortDebugString(compaction) + + " because its sequence id is smaller than this regions lastReplayedOpenRegionSeqId " + + " of " + lastReplayedOpenRegionSeqId); return; } + if (LOG.isDebugEnabled()) { + LOG.debug(getRegionInfo().getEncodedName() + " : " + + "Replaying compaction marker " + TextFormat.shortDebugString(compaction)); + } + startRegionOperation(Operation.REPLAY_EVENT); try { Store store = this.getStore(compaction.getFamilyName().toByteArray()); if (store == null) { - LOG.warn("Found Compaction WAL edit for deleted family:" + - Bytes.toString(compaction.getFamilyName().toByteArray())); + LOG.warn(getRegionInfo().getEncodedName() + " : " + + "Found Compaction WAL edit for deleted family:" + + Bytes.toString(compaction.getFamilyName().toByteArray())); return; } store.replayCompactionMarker(compaction, pickCompactionFiles, removeFiles); @@ -4080,7 +4147,7 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver { // } } - void replayWALFlushMarker(FlushDescriptor flush) throws IOException { + void replayWALFlushMarker(FlushDescriptor flush, long replaySeqId) throws IOException { checkTargetRegion(flush.getEncodedRegionName().toByteArray(), "Flush marker from WAL ", flush); @@ -4089,7 +4156,8 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver { // } if (LOG.isDebugEnabled()) { - LOG.debug("Replaying flush marker " + TextFormat.shortDebugString(flush)); + LOG.debug(getRegionInfo().getEncodedName() + " : " + + "Replaying flush marker " + TextFormat.shortDebugString(flush)); } startRegionOperation(Operation.REPLAY_EVENT); // use region close lock to guard against close @@ -4105,9 +4173,13 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver { // case ABORT_FLUSH: replayWALFlushAbortMarker(flush); break; + case CANNOT_FLUSH: + replayWALFlushCannotFlushMarker(flush, replaySeqId); + break; default: - LOG.warn("Received a flush event with unknown action, ignoring. " - + TextFormat.shortDebugString(flush)); + LOG.warn(getRegionInfo().getEncodedName() + " : " + + "Received a flush event with unknown action, ignoring. " + + TextFormat.shortDebugString(flush)); break; } } finally { @@ -4128,7 +4200,8 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver { // byte[] family = storeFlush.getFamilyName().toByteArray(); Store store = getStore(family); if (store == null) { - LOG.info("Received a flush start marker from primary, but the family is not found. Ignoring" + LOG.warn(getRegionInfo().getEncodedName() + " : " + + "Received a flush start marker from primary, but the family is not found. Ignoring" + " StoreFlushDescriptor:" + TextFormat.shortDebugString(storeFlush)); continue; } @@ -4142,9 +4215,10 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver { // synchronized (writestate) { try { if (flush.getFlushSequenceNumber() < lastReplayedOpenRegionSeqId) { - LOG.warn("Skipping replaying flush event :" + TextFormat.shortDebugString(flush) - + " because its sequence id is smaller than this regions lastReplayedOpenRegionSeqId " - + " of " + lastReplayedOpenRegionSeqId); + LOG.warn(getRegionInfo().getEncodedName() + " : " + + "Skipping replaying flush event :" + TextFormat.shortDebugString(flush) + + " because its sequence id is smaller than this regions lastReplayedOpenRegionSeqId " + + " of " + lastReplayedOpenRegionSeqId); return null; } if (numMutationsWithoutWAL.get() > 0) { @@ -4158,7 +4232,7 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver { // // invoke prepareFlushCache. Send null as wal since we do not want the flush events in wal PrepareFlushResult prepareResult = internalPrepareFlushCache(null, - flushSeqId, storesToFlush, status, true); + flushSeqId, storesToFlush, status, false); if (prepareResult.result == null) { // save the PrepareFlushResult so that we can use it later from commit flush this.writestate.flushing = true; @@ -4169,6 +4243,16 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver { // + " Prepared flush with seqId:" + flush.getFlushSequenceNumber()); } } else { + // special case empty memstore. We will still save the flush result in this case, since + // our memstore ie empty, but the primary is still flushing + if (prepareResult.result.result == FlushResult.Result.CANNOT_FLUSH_MEMSTORE_EMPTY) { + this.writestate.flushing = true; + this.prepareFlushResult = prepareResult; + if (LOG.isDebugEnabled()) { + LOG.debug(getRegionInfo().getEncodedName() + " : " + + " Prepared empty flush with seqId:" + flush.getFlushSequenceNumber()); + } + } status.abort("Flush prepare failed with " + prepareResult.result); // nothing much to do. prepare flush failed because of some reason. } @@ -4177,20 +4261,23 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver { // // we already have an active snapshot. if (flush.getFlushSequenceNumber() == this.prepareFlushResult.flushOpSeqId) { // They define the same flush. Log and continue. - LOG.warn("Received a flush prepare marker with the same seqId: " + + LOG.warn(getRegionInfo().getEncodedName() + " : " + + "Received a flush prepare marker with the same seqId: " + + flush.getFlushSequenceNumber() + " before clearing the previous one with seqId: " + prepareFlushResult.flushOpSeqId + ". Ignoring"); // ignore } else if (flush.getFlushSequenceNumber() < this.prepareFlushResult.flushOpSeqId) { // We received a flush with a smaller seqNum than what we have prepared. We can only // ignore this prepare flush request. - LOG.warn("Received a flush prepare marker with a smaller seqId: " + + LOG.warn(getRegionInfo().getEncodedName() + " : " + + "Received a flush prepare marker with a smaller seqId: " + + flush.getFlushSequenceNumber() + " before clearing the previous one with seqId: " + prepareFlushResult.flushOpSeqId + ". Ignoring"); // ignore } else { // We received a flush with a larger seqNum than what we have prepared - LOG.warn("Received a flush prepare marker with a larger seqId: " + + LOG.warn(getRegionInfo().getEncodedName() + " : " + + "Received a flush prepare marker with a larger seqId: " + + flush.getFlushSequenceNumber() + " before clearing the previous one with seqId: " + prepareFlushResult.flushOpSeqId + ". Ignoring"); // We do not have multiple active snapshots in the memstore or a way to merge current @@ -4225,7 +4312,8 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver { // synchronized (writestate) { try { if (flush.getFlushSequenceNumber() < lastReplayedOpenRegionSeqId) { - LOG.warn("Skipping replaying flush event :" + TextFormat.shortDebugString(flush) + LOG.warn(getRegionInfo().getEncodedName() + " : " + + "Skipping replaying flush event :" + TextFormat.shortDebugString(flush) + " because its sequence id is smaller than this regions lastReplayedOpenRegionSeqId " + " of " + lastReplayedOpenRegionSeqId); return; @@ -4253,7 +4341,8 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver { // // we received a flush commit with a smaller seqId than what we have prepared // we will pick the flush file up from this commit (if we have not seen it), but we // will not drop the memstore - LOG.warn("Received a flush commit marker with smaller seqId: " + LOG.warn(getRegionInfo().getEncodedName() + " : " + + "Received a flush commit marker with smaller seqId: " + flush.getFlushSequenceNumber() + " than what we have prepared with seqId: " + prepareFlushResult.flushOpSeqId + ". Picking up new file, but not dropping" +" prepared memstore snapshot"); @@ -4267,7 +4356,8 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver { // // we will pick the flush file for this. We will also obtain the updates lock and // look for contents of the memstore to see whether we have edits after this seqId. // If not, we will drop all the memstore edits and the snapshot as well. - LOG.warn("Received a flush commit marker with larger seqId: " + LOG.warn(getRegionInfo().getEncodedName() + " : " + + "Received a flush commit marker with larger seqId: " + flush.getFlushSequenceNumber() + " than what we have prepared with seqId: " + prepareFlushResult.flushOpSeqId + ". Picking up new file and dropping prepared" +" memstore snapshot"); @@ -4284,6 +4374,12 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver { // this.prepareFlushResult = null; writestate.flushing = false; } + // If we were waiting for observing a flush or region opening event for not showing + // partial data after a secondary region crash, we can allow reads now. We can only make + // sure that we are not showing partial data (for example skipping some previous edits) + // until we observe a full flush start and flush commit. So if we were not able to find + // a previous flush we will not enable reads now. + this.setReadsEnabled(true); } else { LOG.warn(getRegionInfo().getEncodedName() + " : " + "Received a flush commit marker with seqId:" + flush.getFlushSequenceNumber() @@ -4337,14 +4433,15 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver { // byte[] family = storeFlush.getFamilyName().toByteArray(); Store store = getStore(family); if (store == null) { - LOG.warn("Received a flush commit marker from primary, but the family is not found." + - "Ignoring StoreFlushDescriptor:" + storeFlush); + LOG.warn(getRegionInfo().getEncodedName() + " : " + + "Received a flush commit marker from primary, but the family is not found." + + "Ignoring StoreFlushDescriptor:" + storeFlush); continue; } List flushFiles = storeFlush.getFlushOutputList(); StoreFlushContext ctx = null; long startTime = EnvironmentEdgeManager.currentTime(); - if (prepareFlushResult == null) { + if (prepareFlushResult == null || prepareFlushResult.storeFlushCtxs == null) { ctx = store.createFlushContext(flush.getFlushSequenceNumber()); } else { ctx = prepareFlushResult.storeFlushCtxs.get(family); @@ -4352,7 +4449,8 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver { // } if (ctx == null) { - LOG.warn("Unexpected: flush commit marker received from store " + LOG.warn(getRegionInfo().getEncodedName() + " : " + + "Unexpected: flush commit marker received from store " + Bytes.toString(family) + " but no associated flush context. Ignoring"); continue; } @@ -4376,7 +4474,8 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver { // long currentSeqId = getSequenceId().get(); if (seqId >= currentSeqId) { // then we can drop the memstore contents since everything is below this seqId - LOG.info("Dropping memstore contents as well since replayed flush seqId: " + LOG.info(getRegionInfo().getEncodedName() + " : " + + "Dropping memstore contents as well since replayed flush seqId: " + seqId + " is greater than current seqId:" + currentSeqId); // Prepare flush (take a snapshot) and then abort (drop the snapshot) @@ -4388,7 +4487,8 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver { // dropStoreMemstoreContentsForSeqId(store, currentSeqId); } } else { - LOG.info("Not dropping memstore contents since replayed flush seqId: " + LOG.info(getRegionInfo().getEncodedName() + " : " + + "Not dropping memstore contents since replayed flush seqId: " + seqId + " is smaller than current seqId:" + currentSeqId); } } finally { @@ -4409,6 +4509,25 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver { // // that will drop the snapshot } + private void replayWALFlushCannotFlushMarker(FlushDescriptor flush, long replaySeqId) { + synchronized (writestate) { + if (this.lastReplayedOpenRegionSeqId > replaySeqId) { + LOG.warn(getRegionInfo().getEncodedName() + " : " + + "Skipping replaying flush event :" + TextFormat.shortDebugString(flush) + + " because its sequence id " + replaySeqId + " is smaller than this regions " + + "lastReplayedOpenRegionSeqId of " + lastReplayedOpenRegionSeqId); + return; + } + + // If we were waiting for observing a flush or region opening event for not showing partial + // data after a secondary region crash, we can allow reads now. This event means that the + // primary was not able to flush because memstore is empty when we requested flush. By the + // time we observe this, we are guaranteed to have up to date seqId with our previous + // assignment. + this.setReadsEnabled(true); + } + } + @VisibleForTesting PrepareFlushResult getPrepareFlushResult() { return prepareFlushResult; @@ -4429,13 +4548,15 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver { // return; } if (regionEvent.getEventType() != EventType.REGION_OPEN) { - LOG.warn("Unknown region event received, ignoring :" + LOG.warn(getRegionInfo().getEncodedName() + " : " + + "Unknown region event received, ignoring :" + TextFormat.shortDebugString(regionEvent)); return; } if (LOG.isDebugEnabled()) { - LOG.debug("Replaying region open event marker " + TextFormat.shortDebugString(regionEvent)); + LOG.debug(getRegionInfo().getEncodedName() + " : " + + "Replaying region open event marker " + TextFormat.shortDebugString(regionEvent)); } // we will use writestate as a coarse-grain lock for all the replay events @@ -4446,10 +4567,11 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver { // // region open event's seqid. Since this is the first event that the region puts (after // possibly flushing recovered.edits), after seeing this event, we can ignore every edit // smaller than this seqId - if (this.lastReplayedOpenRegionSeqId < regionEvent.getLogSequenceNumber()) { + if (this.lastReplayedOpenRegionSeqId <= regionEvent.getLogSequenceNumber()) { this.lastReplayedOpenRegionSeqId = regionEvent.getLogSequenceNumber(); } else { - LOG.warn("Skipping replaying region event :" + TextFormat.shortDebugString(regionEvent) + LOG.warn(getRegionInfo().getEncodedName() + " : " + + "Skipping replaying region event :" + TextFormat.shortDebugString(regionEvent) + " because its sequence id is smaller than this regions lastReplayedOpenRegionSeqId " + " of " + lastReplayedOpenRegionSeqId); return; @@ -4462,7 +4584,8 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver { // byte[] family = storeDescriptor.getFamilyName().toByteArray(); Store store = getStore(family); if (store == null) { - LOG.warn("Received a region open marker from primary, but the family is not found. " + LOG.warn(getRegionInfo().getEncodedName() + " : " + + "Received a region open marker from primary, but the family is not found. " + "Ignoring. StoreDescriptor:" + storeDescriptor); continue; } @@ -4478,7 +4601,8 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver { // if (writestate.flushing) { // only drop memstore snapshots if they are smaller than last flush for the store if (this.prepareFlushResult.flushOpSeqId <= regionEvent.getLogSequenceNumber()) { - StoreFlushContext ctx = this.prepareFlushResult.storeFlushCtxs.get(family); + StoreFlushContext ctx = this.prepareFlushResult.storeFlushCtxs == null ? + null : this.prepareFlushResult.storeFlushCtxs.get(family); if (ctx != null) { long snapshotSize = store.getFlushableSize(); ctx.abort(); @@ -4524,6 +4648,10 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver { // // either greater than flush seq number or they were already dropped via flush. getMVCC().advanceMemstoreReadPointIfNeeded(this.maxFlushedSeqId); + // If we were waiting for observing a flush or region opening event for not showing partial + // data after a secondary region crash, we can allow reads now. + this.setReadsEnabled(true); + // C. Finally notify anyone waiting on memstore to clear: // e.g. checkResources(). synchronized (this) { @@ -4865,7 +4993,7 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver { // // guaranteed to be one beyond the file made when we flushed (or if nothing to flush, it is // a sequence id that we can be sure is beyond the last hfile written). if (assignSeqId) { - FlushResult fs = this.flushcache(true); + FlushResult fs = this.flushcache(); if (fs.isFlushSucceeded()) { seqId = fs.flushSequenceId; } else if (fs.result == FlushResult.Result.CANNOT_FLUSH_MEMSTORE_EMPTY) { @@ -5832,8 +5960,8 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver { // FileSystem fs = a.getRegionFileSystem().getFileSystem(); // Make sure each region's cache is empty - a.flushcache(true); - b.flushcache(true); + a.flushcache(); + b.flushcache(); // Compact each region so we only have one store file per family a.compactStores(true); diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java index c77bb3766d3..debcd6d3876 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java @@ -80,6 +80,7 @@ import org.apache.hadoop.hbase.classification.InterfaceAudience; import org.apache.hadoop.hbase.client.ClusterConnection; import org.apache.hadoop.hbase.client.ConnectionFactory; import org.apache.hadoop.hbase.client.ConnectionUtils; +import org.apache.hadoop.hbase.client.RpcRetryingCallerFactory; import org.apache.hadoop.hbase.conf.ConfigurationManager; import org.apache.hadoop.hbase.coordination.BaseCoordinatedStateManager; import org.apache.hadoop.hbase.coordination.SplitLogWorkerCoordination; @@ -94,6 +95,7 @@ import org.apache.hadoop.hbase.http.InfoServer; import org.apache.hadoop.hbase.io.hfile.CacheConfig; import org.apache.hadoop.hbase.ipc.RpcClient; import org.apache.hadoop.hbase.ipc.RpcClientFactory; +import org.apache.hadoop.hbase.ipc.RpcControllerFactory; import org.apache.hadoop.hbase.ipc.RpcServerInterface; import org.apache.hadoop.hbase.ipc.ServerNotRunningYetException; import org.apache.hadoop.hbase.ipc.ServerRpcController; @@ -128,6 +130,7 @@ import org.apache.hadoop.hbase.quotas.RegionServerQuotaManager; import org.apache.hadoop.hbase.regionserver.compactions.CompactionProgress; import org.apache.hadoop.hbase.regionserver.handler.CloseMetaHandler; import org.apache.hadoop.hbase.regionserver.handler.CloseRegionHandler; +import org.apache.hadoop.hbase.regionserver.handler.RegionReplicaFlushHandler; import org.apache.hadoop.hbase.regionserver.wal.MetricsWAL; import org.apache.hadoop.hbase.regionserver.wal.WALActionsListener; import org.apache.hadoop.hbase.replication.regionserver.ReplicationLoad; @@ -143,6 +146,7 @@ import org.apache.hadoop.hbase.util.FSUtils; import org.apache.hadoop.hbase.util.HasThread; import org.apache.hadoop.hbase.util.JSONBean; import org.apache.hadoop.hbase.util.JvmPauseMonitor; +import org.apache.hadoop.hbase.util.ServerRegionReplicaUtil; import org.apache.hadoop.hbase.util.Sleeper; import org.apache.hadoop.hbase.util.Threads; import org.apache.hadoop.hbase.util.VersionInfo; @@ -313,6 +317,9 @@ public class HRegionServer extends HasThread implements // RPC client. Used to make the stub above that does region server status checking. RpcClient rpcClient; + private RpcRetryingCallerFactory rpcRetryingCallerFactory; + private RpcControllerFactory rpcControllerFactory; + private UncaughtExceptionHandler uncaughtExceptionHandler; // Info server. Default access so can be used by unit tests. REGIONSERVER @@ -369,6 +376,7 @@ public class HRegionServer extends HasThread implements protected final Sleeper sleeper; private final int operationTimeout; + private final int shortOperationTimeout; private final RegionServerAccounting regionServerAccounting; @@ -495,6 +503,10 @@ public class HRegionServer extends HasThread implements "hbase.regionserver.numregionstoreport", 10); this.operationTimeout = conf.getInt( + HConstants.HBASE_CLIENT_OPERATION_TIMEOUT, + HConstants.DEFAULT_HBASE_CLIENT_OPERATION_TIMEOUT); + + this.shortOperationTimeout = conf.getInt( HConstants.HBASE_RPC_SHORTOPERATION_TIMEOUT_KEY, HConstants.DEFAULT_HBASE_RPC_SHORTOPERATION_TIMEOUT); @@ -506,6 +518,9 @@ public class HRegionServer extends HasThread implements String hostName = rpcServices.isa.getHostName(); serverName = ServerName.valueOf(hostName, rpcServices.isa.getPort(), startcode); + rpcControllerFactory = RpcControllerFactory.instantiate(this.conf); + rpcRetryingCallerFactory = RpcRetryingCallerFactory.instantiate(this.conf); + // login the zookeeper client principal (if using security) ZKUtil.loginClient(this.conf, "hbase.zookeeper.client.keytab.file", "hbase.zookeeper.client.kerberos.principal", hostName); @@ -1639,6 +1654,12 @@ public class HRegionServer extends HasThread implements this.service.startExecutorService(ExecutorType.RS_LOG_REPLAY_OPS, conf.getInt( "hbase.regionserver.wal.max.splitters", SplitLogWorkerCoordination.DEFAULT_MAX_SPLITTERS)); + if (ServerRegionReplicaUtil.isRegionReplicaWaitForPrimaryFlushEnabled(conf)) { + this.service.startExecutorService(ExecutorType.RS_REGION_REPLICA_FLUSH_OPS, + conf.getInt("hbase.regionserver.region.replica.flusher.threads", + conf.getInt("hbase.regionserver.executor.openregion.threads", 3))); + } + Threads.setDaemonThreadRunning(this.walRoller.getThread(), getName() + ".logRoller", uncaughtExceptionHandler); this.cacheFlusher.start(uncaughtExceptionHandler); @@ -1842,6 +1863,8 @@ public class HRegionServer extends HasThread implements + r.getRegionNameAsString()); } + triggerFlushInPrimaryRegion(r); + LOG.debug("Finished post open deploy task for " + r.getRegionNameAsString()); } @@ -1917,6 +1940,30 @@ public class HRegionServer extends HasThread implements return false; } + /** + * Trigger a flush in the primary region replica if this region is a secondary replica. Does not + * block this thread. See RegionReplicaFlushHandler for details. + */ + void triggerFlushInPrimaryRegion(final HRegion region) { + if (ServerRegionReplicaUtil.isDefaultReplica(region.getRegionInfo())) { + return; + } + if (!ServerRegionReplicaUtil.isRegionReplicaReplicationEnabled(getConfiguration()) || + !ServerRegionReplicaUtil.isRegionReplicaWaitForPrimaryFlushEnabled( + getConfiguration())) { + region.setReadsEnabled(true); + return; + } + + region.setReadsEnabled(false); // disable reads before marking the region as opened. + // RegionReplicaFlushHandler might reset this. + + // submit it to be handled by one of the handlers so that we do not block OpenRegionHandler + this.service.submit( + new RegionReplicaFlushHandler(this, clusterConnection, + rpcRetryingCallerFactory, rpcControllerFactory, operationTimeout, region)); + } + @Override public RpcServerInterface getRpcServer() { return rpcServices.rpcServer; @@ -2106,7 +2153,8 @@ public class HRegionServer extends HasThread implements } try { BlockingRpcChannel channel = - this.rpcClient.createBlockingRpcChannel(sn, userProvider.getCurrent(), operationTimeout); + this.rpcClient.createBlockingRpcChannel(sn, userProvider.getCurrent(), + shortOperationTimeout); intf = RegionServerStatusService.newBlockingStub(channel); break; } catch (IOException e) { diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HStore.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HStore.java index 7dbe55dc13a..11a820bba0c 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HStore.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HStore.java @@ -2241,6 +2241,8 @@ public class HStore implements Store { StoreFileInfo storeFileInfo = fs.getStoreFileInfo(getColumnFamilyName(), file); StoreFile storeFile = createStoreFileAndReader(storeFileInfo); storeFiles.add(storeFile); + HStore.this.storeSize += storeFile.getReader().length(); + HStore.this.totalUncompressedBytes += storeFile.getReader().getTotalUncompressedBytes(); if (LOG.isInfoEnabled()) { LOG.info("Region: " + HStore.this.getRegionInfo().getEncodedName() + " added " + storeFile + ", entries=" + storeFile.getReader().getEntries() + @@ -2249,7 +2251,10 @@ public class HStore implements Store { } } - long snapshotId = dropMemstoreSnapshot ? snapshot.getId() : -1; // -1 means do not drop + long snapshotId = -1; // -1 means do not drop + if (dropMemstoreSnapshot && snapshot != null) { + snapshotId = snapshot.getId(); + } HStore.this.updateStorefiles(storeFiles, snapshotId); } diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RSRpcServices.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RSRpcServices.java index 4cd25dabe3d..cd0813bdd1d 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RSRpcServices.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RSRpcServices.java @@ -724,7 +724,7 @@ public class RSRpcServices implements HBaseRPCErrorHandler, } FlushDescriptor flushDesc = WALEdit.getFlushDescriptor(metaCell); if (flushDesc != null && !isDefaultReplica) { - region.replayWALFlushMarker(flushDesc); + region.replayWALFlushMarker(flushDesc, replaySeqId); continue; } RegionEventDescriptor regionEvent = WALEdit.getRegionEventDescriptor(metaCell); @@ -1091,18 +1091,21 @@ public class RSRpcServices implements HBaseRPCErrorHandler, } FlushRegionResponse.Builder builder = FlushRegionResponse.newBuilder(); if (shouldFlush) { + boolean writeFlushWalMarker = request.hasWriteFlushWalMarker() ? + request.getWriteFlushWalMarker() : false; long startTime = EnvironmentEdgeManager.currentTime(); - HRegion.FlushResult flushResult = region.flushcache(); + HRegion.FlushResult flushResult = region.flushcache(true, writeFlushWalMarker); if (flushResult.isFlushSucceeded()) { long endTime = EnvironmentEdgeManager.currentTime(); regionServer.metricsRegionServer.updateFlushTime(endTime - startTime); } - boolean result = flushResult.isCompactionNeeded(); - if (result) { + boolean compactionNeeded = flushResult.isCompactionNeeded(); + if (compactionNeeded) { regionServer.compactSplitThread.requestSystemCompaction(region, "Compaction through user triggered flush"); } - builder.setFlushed(result); + builder.setFlushed(flushResult.isFlushSucceeded()); + builder.setWroteFlushWalMarker(flushResult.wroteFlushWalMarker); } builder.setLastFlushTime( region.getEarliestFlushTimeForAllStores()); return builder.build(); @@ -1460,7 +1463,7 @@ public class RSRpcServices implements HBaseRPCErrorHandler, "regions. First region:" + regionName.toStringUtf8() + " , other region:" + entry.getKey().getEncodedRegionName()); } - if (regionServer.nonceManager != null) { + if (regionServer.nonceManager != null && isPrimary) { long nonceGroup = entry.getKey().hasNonceGroup() ? entry.getKey().getNonceGroup() : HConstants.NO_NONCE; long nonce = entry.getKey().hasNonce() ? entry.getKey().getNonce() : HConstants.NO_NONCE; diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/handler/RegionReplicaFlushHandler.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/handler/RegionReplicaFlushHandler.java new file mode 100644 index 00000000000..9661b30b072 --- /dev/null +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/handler/RegionReplicaFlushHandler.java @@ -0,0 +1,187 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.hbase.regionserver.handler; + +import java.io.IOException; +import java.io.InterruptedIOException; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.classification.InterfaceAudience; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hbase.HConstants; +import org.apache.hadoop.hbase.Server; +import org.apache.hadoop.hbase.TableNotFoundException; +import org.apache.hadoop.hbase.client.ClusterConnection; +import org.apache.hadoop.hbase.client.FlushRegionCallable; +import org.apache.hadoop.hbase.client.RegionReplicaUtil; +import org.apache.hadoop.hbase.client.RpcRetryingCallerFactory; +import org.apache.hadoop.hbase.executor.EventHandler; +import org.apache.hadoop.hbase.executor.EventType; +import org.apache.hadoop.hbase.ipc.RpcControllerFactory; +import org.apache.hadoop.hbase.protobuf.generated.AdminProtos.FlushRegionResponse; +import org.apache.hadoop.hbase.regionserver.HRegion; +import org.apache.hadoop.hbase.util.RetryCounter; +import org.apache.hadoop.hbase.util.RetryCounterFactory; +import org.apache.hadoop.hbase.util.ServerRegionReplicaUtil; + +/** + * HBASE-11580: With the async wal approach (HBASE-11568), the edits are not persisted to wal in + * secondary region replicas. This means that a secondary region replica can serve some edits from + * it's memstore that that is still not flushed from primary. We do not want to allow secondary + * region's seqId to go back in time, when this secondary region is opened elsewhere after a + * crash or region move. We will trigger a flush cache in the primary region replica and wait + * for observing a complete flush cycle before marking the region readsEnabled. This handler does + * the flushing of the primary region replica and ensures that regular region opening is not + * blocked while the secondary replica is blocked on flush. + */ +@InterfaceAudience.Private +public class RegionReplicaFlushHandler extends EventHandler { + + private static final Log LOG = LogFactory.getLog(RegionReplicaFlushHandler.class); + + private final ClusterConnection connection; + private final RpcRetryingCallerFactory rpcRetryingCallerFactory; + private final RpcControllerFactory rpcControllerFactory; + private final int operationTimeout; + private final HRegion region; + + public RegionReplicaFlushHandler(Server server, ClusterConnection connection, + RpcRetryingCallerFactory rpcRetryingCallerFactory, RpcControllerFactory rpcControllerFactory, + int operationTimeout, HRegion region) { + super(server, EventType.RS_REGION_REPLICA_FLUSH); + this.connection = connection; + this.rpcRetryingCallerFactory = rpcRetryingCallerFactory; + this.rpcControllerFactory = rpcControllerFactory; + this.operationTimeout = operationTimeout; + this.region = region; + } + + @Override + public void process() throws IOException { + triggerFlushInPrimaryRegion(region); + } + + @Override + protected void handleException(Throwable t) { + super.handleException(t); + + if (t instanceof InterruptedIOException || t instanceof InterruptedException) { + // ignore + } else if (t instanceof RuntimeException) { + server.abort("ServerAborting because a runtime exception was thrown", t); + } else { + // something fishy since we cannot flush the primary region until all retries (retries from + // rpc times 35 trigger). We cannot close the region since there is no such mechanism to + // close a region without master triggering it. We just abort the server for now. + server.abort("ServerAborting because an exception was thrown", t); + } + } + + private int getRetriesCount(Configuration conf) { + int numRetries = conf.getInt(HConstants.HBASE_CLIENT_RETRIES_NUMBER, + HConstants.DEFAULT_HBASE_CLIENT_RETRIES_NUMBER); + if (numRetries > 10) { + int mult = conf.getInt("hbase.client.serverside.retries.multiplier", 10); + numRetries = numRetries / mult; // reset if HRS has multiplied this already + } + return numRetries; + } + + void triggerFlushInPrimaryRegion(final HRegion region) throws IOException, RuntimeException { + long pause = connection.getConfiguration().getLong(HConstants.HBASE_CLIENT_PAUSE, + HConstants.DEFAULT_HBASE_CLIENT_PAUSE); + + int maxAttempts = getRetriesCount(connection.getConfiguration()); + RetryCounter counter = new RetryCounterFactory(maxAttempts, (int)pause).create(); + + if (LOG.isDebugEnabled()) { + LOG.debug("Attempting to do an RPC to the primary region replica " + ServerRegionReplicaUtil + .getRegionInfoForDefaultReplica(region.getRegionInfo()).getEncodedName() + " of region " + + region.getRegionInfo().getEncodedName() + " to trigger a flush"); + } + while (!region.isClosing() && !region.isClosed() + && !server.isAborted() && !server.isStopped()) { + FlushRegionCallable flushCallable = new FlushRegionCallable( + connection, rpcControllerFactory, + RegionReplicaUtil.getRegionInfoForDefaultReplica(region.getRegionInfo()), true); + + // TODO: flushRegion() is a blocking call waiting for the flush to complete. Ideally we + // do not have to wait for the whole flush here, just initiate it. + FlushRegionResponse response = null; + try { + response = rpcRetryingCallerFactory.newCaller() + .callWithRetries(flushCallable, this.operationTimeout); + } catch (IOException ex) { + if (ex instanceof TableNotFoundException + || connection.isTableDisabled(region.getRegionInfo().getTable())) { + return; + } + throw ex; + } + + if (response.getFlushed()) { + // then we have to wait for seeing the flush entry. All reads will be rejected until we see + // a complete flush cycle or replay a region open event + if (LOG.isDebugEnabled()) { + LOG.debug("Successfully triggered a flush of primary region replica " + + ServerRegionReplicaUtil + .getRegionInfoForDefaultReplica(region.getRegionInfo()).getEncodedName() + + " of region " + region.getRegionInfo().getEncodedName() + + " Now waiting and blocking reads until observing a full flush cycle"); + } + break; + } else { + if (response.hasWroteFlushWalMarker()) { + if(response.getWroteFlushWalMarker()) { + if (LOG.isDebugEnabled()) { + LOG.debug("Successfully triggered an empty flush marker(memstore empty) of primary " + + "region replica " + ServerRegionReplicaUtil + .getRegionInfoForDefaultReplica(region.getRegionInfo()).getEncodedName() + + " of region " + region.getRegionInfo().getEncodedName() + " Now waiting and " + + "blocking reads until observing a flush marker"); + } + break; + } else { + // somehow we were not able to get the primary to write the flush request. It may be + // closing or already flushing. Retry flush again after some sleep. + if (!counter.shouldRetry()) { + throw new IOException("Cannot cause primary to flush or drop a wal marker after " + + "retries. Failing opening of this region replica " + + region.getRegionInfo().getEncodedName()); + } + } + } else { + // nothing to do. Are we dealing with an old server? + LOG.warn("Was not able to trigger a flush from primary region due to old server version? " + + "Continuing to open the secondary region replica: " + + region.getRegionInfo().getEncodedName()); + region.setReadsEnabled(true); + break; + } + } + try { + counter.sleepUntilNextRetry(); + } catch (InterruptedException e) { + throw new InterruptedIOException(e.getMessage()); + } + } + } + +} diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/RegionReplicaReplicationEndpoint.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/RegionReplicaReplicationEndpoint.java index 3dd2e01c58b..348136fcd50 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/RegionReplicaReplicationEndpoint.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/RegionReplicaReplicationEndpoint.java @@ -38,7 +38,6 @@ import org.apache.hadoop.hbase.classification.InterfaceAudience; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.Path; import org.apache.hadoop.hbase.CellScanner; -import org.apache.hadoop.hbase.DoNotRetryIOException; import org.apache.hadoop.hbase.HBaseConfiguration; import org.apache.hadoop.hbase.HBaseIOException; import org.apache.hadoop.hbase.HConstants; @@ -51,7 +50,6 @@ import org.apache.hadoop.hbase.client.RegionAdminServiceCallable; import org.apache.hadoop.hbase.client.ClusterConnection; import org.apache.hadoop.hbase.client.HConnectionManager; import org.apache.hadoop.hbase.client.RegionReplicaUtil; -import org.apache.hadoop.hbase.client.RetriesExhaustedException; import org.apache.hadoop.hbase.client.RetryingCallable; import org.apache.hadoop.hbase.client.RpcRetryingCallerFactory; import org.apache.hadoop.hbase.ipc.PayloadCarryingRpcController; @@ -60,7 +58,6 @@ import org.apache.hadoop.hbase.protobuf.ProtobufUtil; import org.apache.hadoop.hbase.protobuf.ReplicationProtbufUtil; import org.apache.hadoop.hbase.protobuf.generated.AdminProtos; import org.apache.hadoop.hbase.protobuf.generated.AdminProtos.ReplicateWALEntryResponse; -import org.apache.hadoop.hbase.regionserver.wal.WALCellCodec; import org.apache.hadoop.hbase.wal.WAL.Entry; import org.apache.hadoop.hbase.wal.WALSplitter.EntryBuffers; import org.apache.hadoop.hbase.wal.WALSplitter.OutputSink; @@ -88,6 +85,10 @@ public class RegionReplicaReplicationEndpoint extends HBaseReplicationEndpoint { private static final Log LOG = LogFactory.getLog(RegionReplicaReplicationEndpoint.class); + // Can be configured differently than hbase.client.retries.number + private static String CLIENT_RETRIES_NUMBER + = "hbase.region.replica.replication.client.retries.number"; + private Configuration conf; private ClusterConnection connection; @@ -109,6 +110,20 @@ public class RegionReplicaReplicationEndpoint extends HBaseReplicationEndpoint { this.conf = HBaseConfiguration.create(context.getConfiguration()); + // HRS multiplies client retries by 10 globally for meta operations, but we do not want this. + // We are resetting it here because we want default number of retries (35) rather than 10 times + // that which makes very long retries for disabled tables etc. + int defaultNumRetries = conf.getInt(HConstants.HBASE_CLIENT_RETRIES_NUMBER, + HConstants.DEFAULT_HBASE_CLIENT_RETRIES_NUMBER); + if (defaultNumRetries > 10) { + int mult = conf.getInt("hbase.client.serverside.retries.multiplier", 10); + defaultNumRetries = defaultNumRetries / mult; // reset if HRS has multiplied this already + } + + conf.setInt("hbase.client.serverside.retries.multiplier", 1); + int numRetries = conf.getInt(CLIENT_RETRIES_NUMBER, defaultNumRetries); + conf.setInt(HConstants.HBASE_CLIENT_RETRIES_NUMBER, numRetries); + this.numWriterThreads = this.conf.getInt( "hbase.region.replica.replication.writer.threads", 3); controller = new PipelineController(); @@ -358,7 +373,8 @@ public class RegionReplicaReplicationEndpoint extends HBaseReplicationEndpoint { while (true) { // get the replicas of the primary region try { - locations = getRegionLocations(connection, tableName, row, useCache, 0); + locations = RegionReplicaReplayCallable + .getRegionLocations(connection, tableName, row, useCache, 0); if (locations == null) { throw new HBaseIOException("Cannot locate locations for " @@ -490,59 +506,21 @@ public class RegionReplicaReplicationEndpoint extends HBaseReplicationEndpoint { */ static class RegionReplicaReplayCallable extends RegionAdminServiceCallable { - // replicaId of the region replica that we want to replicate to - private final int replicaId; private final List entries; private final byte[] initialEncodedRegionName; private final AtomicLong skippedEntries; - private final RpcControllerFactory rpcControllerFactory; - private boolean skip; public RegionReplicaReplayCallable(ClusterConnection connection, RpcControllerFactory rpcControllerFactory, TableName tableName, HRegionLocation location, HRegionInfo regionInfo, byte[] row,List entries, AtomicLong skippedEntries) { - super(connection, location, tableName, row); - this.replicaId = regionInfo.getReplicaId(); + super(connection, rpcControllerFactory, location, tableName, row, regionInfo.getReplicaId()); this.entries = entries; - this.rpcControllerFactory = rpcControllerFactory; this.skippedEntries = skippedEntries; this.initialEncodedRegionName = regionInfo.getEncodedNameAsBytes(); } - @Override - public HRegionLocation getLocation(boolean useCache) throws IOException { - RegionLocations rl = getRegionLocations(connection, tableName, row, useCache, replicaId); - if (rl == null) { - throw new HBaseIOException(getExceptionMessage()); - } - location = rl.getRegionLocation(replicaId); - if (location == null) { - throw new HBaseIOException(getExceptionMessage()); - } - - // check whether we should still replay this entry. If the regions are changed, or the - // entry is not coming form the primary region, filter it out because we do not need it. - // Regions can change because of (1) region split (2) region merge (3) table recreated - if (!Bytes.equals(location.getRegionInfo().getEncodedNameAsBytes(), - initialEncodedRegionName)) { - skip = true; - if (LOG.isTraceEnabled()) { - LOG.trace("Skipping " + entries.size() + " entries in table " + tableName - + " because located region " + location.getRegionInfo().getEncodedName() - + " is different than the original region " - + Bytes.toStringBinary(initialEncodedRegionName) + " from WALEdit"); - for (Entry entry : entries) { - LOG.trace("Skipping : " + entry); - } - } - return null; - } - - return location; - } - @Override public ReplicateWALEntryResponse call(int timeout) throws IOException { return replayToServer(this.entries, timeout); @@ -550,55 +528,46 @@ public class RegionReplicaReplicationEndpoint extends HBaseReplicationEndpoint { private ReplicateWALEntryResponse replayToServer(List entries, int timeout) throws IOException { - if (entries.isEmpty() || skip) { + // check whether we should still replay this entry. If the regions are changed, or the + // entry is not coming form the primary region, filter it out because we do not need it. + // Regions can change because of (1) region split (2) region merge (3) table recreated + boolean skip = false; + + if (!Bytes.equals(location.getRegionInfo().getEncodedNameAsBytes(), + initialEncodedRegionName)) { + skip = true; + } + if (!entries.isEmpty() && !skip) { + Entry[] entriesArray = new Entry[entries.size()]; + entriesArray = entries.toArray(entriesArray); + + // set the region name for the target region replica + Pair p = + ReplicationProtbufUtil.buildReplicateWALEntryRequest( + entriesArray, location.getRegionInfo().getEncodedNameAsBytes()); + try { + PayloadCarryingRpcController controller = rpcControllerFactory.newController(p.getSecond()); + controller.setCallTimeout(timeout); + controller.setPriority(tableName); + return stub.replay(controller, p.getFirst()); + } catch (ServiceException se) { + throw ProtobufUtil.getRemoteException(se); + } + } + + if (skip) { + if (LOG.isTraceEnabled()) { + LOG.trace("Skipping " + entries.size() + " entries in table " + tableName + + " because located region " + location.getRegionInfo().getEncodedName() + + " is different than the original region " + + Bytes.toStringBinary(initialEncodedRegionName) + " from WALEdit"); + for (Entry entry : entries) { + LOG.trace("Skipping : " + entry); + } + } skippedEntries.addAndGet(entries.size()); - return ReplicateWALEntryResponse.newBuilder().build(); - } - - Entry[] entriesArray = new Entry[entries.size()]; - entriesArray = entries.toArray(entriesArray); - - // set the region name for the target region replica - Pair p = - ReplicationProtbufUtil.buildReplicateWALEntryRequest( - entriesArray, location.getRegionInfo().getEncodedNameAsBytes()); - try { - PayloadCarryingRpcController controller = rpcControllerFactory.newController(p.getSecond()); - controller.setCallTimeout(timeout); - controller.setPriority(tableName); - return stub.replay(controller, p.getFirst()); - } catch (ServiceException se) { - throw ProtobufUtil.getRemoteException(se); } + return ReplicateWALEntryResponse.newBuilder().build(); } - - @Override - protected String getExceptionMessage() { - return super.getExceptionMessage() + " table=" + tableName - + " ,replica=" + replicaId + ", row=" + Bytes.toStringBinary(row); - } - } - - private static RegionLocations getRegionLocations( - ClusterConnection connection, TableName tableName, byte[] row, - boolean useCache, int replicaId) - throws RetriesExhaustedException, DoNotRetryIOException, InterruptedIOException { - RegionLocations rl; - try { - rl = connection.locateRegion(tableName, row, useCache, true, replicaId); - } catch (DoNotRetryIOException e) { - throw e; - } catch (RetriesExhaustedException e) { - throw e; - } catch (InterruptedIOException e) { - throw e; - } catch (IOException e) { - throw new RetriesExhaustedException("Can't get the location", e); - } - if (rl == null) { - throw new RetriesExhaustedException("Can't get the locations"); - } - - return rl; } } diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/util/ServerRegionReplicaUtil.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/util/ServerRegionReplicaUtil.java index 7bcee0b1de9..710698b2bba 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/util/ServerRegionReplicaUtil.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/util/ServerRegionReplicaUtil.java @@ -55,6 +55,19 @@ public class ServerRegionReplicaUtil extends RegionReplicaUtil { private static final boolean DEFAULT_REGION_REPLICA_REPLICATION = false; private static final String REGION_REPLICA_REPLICATION_PEER = "region_replica_replication"; + /** + * Whether or not the secondary region will wait for observing a flush / region open event + * from the primary region via async wal replication before enabling read requests. Since replayed + * edits from async wal replication from primary is not persisted in WAL, the memstore of the + * secondary region might be non-empty at the time of close or crash. For ensuring seqId's not + * "going back in time" in the secondary region replica, this should be enabled. However, in some + * cases the above semantics might be ok for some application classes. + * See HBASE-11580 for more context. + */ + public static final String REGION_REPLICA_WAIT_FOR_PRIMARY_FLUSH_CONF_KEY + = "hbase.region.replica.wait.for.primary.flush"; + private static final boolean DEFAULT_REGION_REPLICA_WAIT_FOR_PRIMARY_FLUSH = true; + /** * Returns the regionInfo object to use for interacting with the file system. * @return An HRegionInfo object to interact with the filesystem @@ -122,7 +135,7 @@ public class ServerRegionReplicaUtil extends RegionReplicaUtil { * @throws IOException */ public static void setupRegionReplicaReplication(Configuration conf) throws IOException { - if (!conf.getBoolean(REGION_REPLICA_REPLICATION_CONF_KEY, DEFAULT_REGION_REPLICA_REPLICATION)) { + if (!isRegionReplicaReplicationEnabled(conf)) { return; } ReplicationAdmin repAdmin = new ReplicationAdmin(conf); @@ -140,6 +153,16 @@ public class ServerRegionReplicaUtil extends RegionReplicaUtil { } } + public static boolean isRegionReplicaReplicationEnabled(Configuration conf) { + return conf.getBoolean(REGION_REPLICA_REPLICATION_CONF_KEY, + DEFAULT_REGION_REPLICA_REPLICATION); + } + + public static boolean isRegionReplicaWaitForPrimaryFlushEnabled(Configuration conf) { + return conf.getBoolean(REGION_REPLICA_WAIT_FOR_PRIMARY_FLUSH_CONF_KEY, + DEFAULT_REGION_REPLICA_WAIT_FOR_PRIMARY_FLUSH); + } + /** * Return the peer id used for replicating to secondary region replicas */ diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/HBaseTestingUtility.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/HBaseTestingUtility.java index 0ef788790f7..2ef745196ca 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/HBaseTestingUtility.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/HBaseTestingUtility.java @@ -63,6 +63,7 @@ import org.apache.hadoop.hbase.classification.InterfaceStability; import org.apache.hadoop.hbase.client.Admin; import org.apache.hadoop.hbase.client.Connection; import org.apache.hadoop.hbase.client.ConnectionFactory; +import org.apache.hadoop.hbase.client.Consistency; import org.apache.hadoop.hbase.client.Delete; import org.apache.hadoop.hbase.client.Durability; import org.apache.hadoop.hbase.client.Get; @@ -2170,6 +2171,25 @@ public class HBaseTestingUtility extends HBaseCommonTestingUtility { } } + public void verifyNumericRows(Table table, final byte[] f, int startRow, int endRow, + int replicaId) + throws IOException { + for (int i = startRow; i < endRow; i++) { + String failMsg = "Failed verification of row :" + i; + byte[] data = Bytes.toBytes(String.valueOf(i)); + Get get = new Get(data); + get.setReplicaId(replicaId); + get.setConsistency(Consistency.TIMELINE); + Result result = table.get(get); + assertTrue(failMsg, result.containsColumn(f, null)); + assertEquals(failMsg, result.getColumnCells(f, null).size(), 1); + Cell cell = result.getColumnLatestCell(f, null); + assertTrue(failMsg, + Bytes.equals(data, 0, data.length, cell.getValueArray(), cell.getValueOffset(), + cell.getValueLength())); + } + } + public void verifyNumericRows(HRegion region, final byte[] f, int startRow, int endRow) throws IOException { for (int i = startRow; i < endRow; i++) { diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/io/encoding/TestPrefixTree.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/io/encoding/TestPrefixTree.java index 80a50b02ab0..c4560cd1e49 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/io/encoding/TestPrefixTree.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/io/encoding/TestPrefixTree.java @@ -100,7 +100,7 @@ public class TestPrefixTree { put = new Put(row4_bytes); put.add(fam, qual2, Bytes.toBytes("c2-value-3")); region.put(put); - region.flushcache(true); + region.flushcache(); String[] rows = new String[3]; rows[0] = row1; rows[1] = row2; @@ -182,7 +182,7 @@ public class TestPrefixTree { region.put(new Put(Bytes.toBytes("obj29")).add(fam, qual1, Bytes.toBytes("whatever"))); region.put(new Put(Bytes.toBytes("obj2")).add(fam, qual1, Bytes.toBytes("whatever"))); region.put(new Put(Bytes.toBytes("obj3")).add(fam, qual1, Bytes.toBytes("whatever"))); - region.flushcache(true); + region.flushcache(); Scan scan = new Scan(Bytes.toBytes("obj29995")); RegionScanner scanner = region.getScanner(scan); List cells = new ArrayList(); diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestHRegionReplayEvents.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestHRegionReplayEvents.java index 09e9d5edc88..4c056c4f0c6 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestHRegionReplayEvents.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestHRegionReplayEvents.java @@ -48,6 +48,7 @@ import org.apache.hadoop.hbase.HTableDescriptor; import org.apache.hadoop.hbase.ServerName; import org.apache.hadoop.hbase.TableName; import org.apache.hadoop.hbase.client.Durability; +import org.apache.hadoop.hbase.client.Get; import org.apache.hadoop.hbase.client.Put; import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.MutationProto.MutationType; import org.apache.hadoop.hbase.protobuf.generated.WALProtos.CompactionDescriptor; @@ -312,6 +313,8 @@ public class TestHRegionReplayEvents { long storeMemstoreSize = store.getMemStoreSize(); long regionMemstoreSize = secondaryRegion.getMemstoreSize().get(); long storeFlushableSize = store.getFlushableSize(); + long storeSize = store.getSize(); + long storeSizeUncompressed = store.getStoreSizeUncompressed(); if (flushDesc.getAction() == FlushAction.START_FLUSH) { LOG.info("-- Replaying flush start in secondary"); PrepareFlushResult result = secondaryRegion.replayWALFlushStartMarker(flushDesc); @@ -339,6 +342,11 @@ public class TestHRegionReplayEvents { // assert that the region memstore is smaller now long newRegionMemstoreSize = secondaryRegion.getMemstoreSize().get(); assertTrue(regionMemstoreSize > newRegionMemstoreSize); + + // assert that the store sizes are bigger + assertTrue(store.getSize() > storeSize); + assertTrue(store.getStoreSizeUncompressed() > storeSizeUncompressed); + assertEquals(store.getSize(), store.getStorefilesSize()); } // after replay verify that everything is still visible verifyData(secondaryRegion, 0, lastReplayed+1, cq, families); @@ -1112,6 +1120,207 @@ public class TestHRegionReplayEvents { (WALKey)any(), (WALEdit)any(), (AtomicLong)any(), anyBoolean(), (List) any()); } + /** + * Tests the reads enabled flag for the region. When unset all reads should be rejected + */ + @Test + public void testRegionReadsEnabledFlag() throws IOException { + + putDataByReplay(secondaryRegion, 0, 100, cq, families); + + verifyData(secondaryRegion, 0, 100, cq, families); + + // now disable reads + secondaryRegion.setReadsEnabled(false); + try { + verifyData(secondaryRegion, 0, 100, cq, families); + fail("Should have failed with IOException"); + } catch(IOException ex) { + // expected + } + + // verify that we can still replay data + putDataByReplay(secondaryRegion, 100, 100, cq, families); + + // now enable reads again + secondaryRegion.setReadsEnabled(true); + verifyData(secondaryRegion, 0, 200, cq, families); + } + + /** + * Tests the case where a request for flush cache is sent to the region, but region cannot flush. + * It should write the flush request marker instead. + */ + @Test + public void testWriteFlushRequestMarker() throws IOException { + // primary region is empty at this point. Request a flush with writeFlushRequestWalMarker=false + FlushResult result = primaryRegion.flushcache(true, false); + assertNotNull(result); + assertEquals(result.result, FlushResult.Result.CANNOT_FLUSH_MEMSTORE_EMPTY); + assertFalse(result.wroteFlushWalMarker); + + // request flush again, but this time with writeFlushRequestWalMarker = true + result = primaryRegion.flushcache(true, true); + assertNotNull(result); + assertEquals(result.result, FlushResult.Result.CANNOT_FLUSH_MEMSTORE_EMPTY); + assertTrue(result.wroteFlushWalMarker); + + List flushes = Lists.newArrayList(); + reader = createWALReaderForPrimary(); + while (true) { + WAL.Entry entry = reader.next(); + if (entry == null) { + break; + } + FlushDescriptor flush = WALEdit.getFlushDescriptor(entry.getEdit().getCells().get(0)); + if (flush != null) { + flushes.add(flush); + } + } + + assertEquals(1, flushes.size()); + assertNotNull(flushes.get(0)); + assertEquals(FlushDescriptor.FlushAction.CANNOT_FLUSH, flushes.get(0).getAction()); + } + + /** + * Test the case where the secondary region replica is not in reads enabled state because it is + * waiting for a flush or region open marker from primary region. Replaying CANNOT_FLUSH + * flush marker entry should restore the reads enabled status in the region and allow the reads + * to continue. + */ + @Test + public void testReplayingFlushRequestRestoresReadsEnabledState() throws IOException { + disableReads(secondaryRegion); + + // Test case 1: Test that replaying CANNOT_FLUSH request marker assuming this came from + // triggered flush restores readsEnabled + primaryRegion.flushcache(true, true); + reader = createWALReaderForPrimary(); + while (true) { + WAL.Entry entry = reader.next(); + if (entry == null) { + break; + } + FlushDescriptor flush = WALEdit.getFlushDescriptor(entry.getEdit().getCells().get(0)); + if (flush != null) { + secondaryRegion.replayWALFlushMarker(flush, entry.getKey().getLogSeqNum()); + } + } + + // now reads should be enabled + secondaryRegion.get(new Get(Bytes.toBytes(0))); + } + + /** + * Test the case where the secondary region replica is not in reads enabled state because it is + * waiting for a flush or region open marker from primary region. Replaying flush start and commit + * entries should restore the reads enabled status in the region and allow the reads + * to continue. + */ + @Test + public void testReplayingFlushRestoresReadsEnabledState() throws IOException { + // Test case 2: Test that replaying FLUSH_START and FLUSH_COMMIT markers assuming these came + // from triggered flush restores readsEnabled + disableReads(secondaryRegion); + + // put some data in primary + putData(primaryRegion, Durability.SYNC_WAL, 0, 100, cq, families); + primaryRegion.flushcache(); + + reader = createWALReaderForPrimary(); + while (true) { + WAL.Entry entry = reader.next(); + if (entry == null) { + break; + } + FlushDescriptor flush = WALEdit.getFlushDescriptor(entry.getEdit().getCells().get(0)); + if (flush != null) { + secondaryRegion.replayWALFlushMarker(flush, entry.getKey().getLogSeqNum()); + } else { + replayEdit(secondaryRegion, entry); + } + } + + // now reads should be enabled + verifyData(secondaryRegion, 0, 100, cq, families); + } + + /** + * Test the case where the secondary region replica is not in reads enabled state because it is + * waiting for a flush or region open marker from primary region. Replaying flush start and commit + * entries should restore the reads enabled status in the region and allow the reads + * to continue. + */ + @Test + public void testReplayingFlushWithEmptyMemstoreRestoresReadsEnabledState() throws IOException { + // Test case 2: Test that replaying FLUSH_START and FLUSH_COMMIT markers assuming these came + // from triggered flush restores readsEnabled + disableReads(secondaryRegion); + + // put some data in primary + putData(primaryRegion, Durability.SYNC_WAL, 0, 100, cq, families); + primaryRegion.flushcache(); + + reader = createWALReaderForPrimary(); + while (true) { + WAL.Entry entry = reader.next(); + if (entry == null) { + break; + } + FlushDescriptor flush = WALEdit.getFlushDescriptor(entry.getEdit().getCells().get(0)); + if (flush != null) { + secondaryRegion.replayWALFlushMarker(flush, entry.getKey().getLogSeqNum()); + } + } + + // now reads should be enabled + verifyData(secondaryRegion, 0, 100, cq, families); + } + + /** + * Test the case where the secondary region replica is not in reads enabled state because it is + * waiting for a flush or region open marker from primary region. Replaying region open event + * entry from primary should restore the reads enabled status in the region and allow the reads + * to continue. + */ + @Test + public void testReplayingRegionOpenEventRestoresReadsEnabledState() throws IOException { + // Test case 3: Test that replaying region open event markers restores readsEnabled + disableReads(secondaryRegion); + + primaryRegion.close(); + primaryRegion = HRegion.openHRegion(rootDir, primaryHri, htd, walPrimary, CONF, rss, null); + + reader = createWALReaderForPrimary(); + while (true) { + WAL.Entry entry = reader.next(); + if (entry == null) { + break; + } + + RegionEventDescriptor regionEventDesc + = WALEdit.getRegionEventDescriptor(entry.getEdit().getCells().get(0)); + + if (regionEventDesc != null) { + secondaryRegion.replayWALRegionEventMarker(regionEventDesc); + } + } + + // now reads should be enabled + secondaryRegion.get(new Get(Bytes.toBytes(0))); + } + + private void disableReads(HRegion region) { + region.setReadsEnabled(false); + try { + verifyData(region, 0, 1, cq, families); + fail("Should have failed with IOException"); + } catch(IOException ex) { + // expected + } + } + private void replay(HRegion region, Put put, long replaySeqId) throws IOException { put.setDurability(Durability.SKIP_WAL); MutationReplay mutation = new MutationReplay(MutationType.PUT, put, 0, 0); diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestRegionReplicaFailover.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestRegionReplicaFailover.java new file mode 100644 index 00000000000..75c6967ff45 --- /dev/null +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestRegionReplicaFailover.java @@ -0,0 +1,373 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.hbase.regionserver; + +import static org.junit.Assert.*; + +import java.io.IOException; +import java.util.Arrays; +import java.util.Collection; +import java.util.concurrent.atomic.AtomicBoolean; +import java.util.concurrent.atomic.AtomicInteger; +import java.util.concurrent.atomic.AtomicReference; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.commons.logging.impl.Log4JLogger; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hbase.HBaseTestingUtility; +import org.apache.hadoop.hbase.HConstants; +import org.apache.hadoop.hbase.HTableDescriptor; +import org.apache.hadoop.hbase.TableName; +import org.apache.hadoop.hbase.Waiter.Predicate; +import org.apache.hadoop.hbase.client.Admin; +import org.apache.hadoop.hbase.client.Connection; +import org.apache.hadoop.hbase.client.ConnectionFactory; +import org.apache.hadoop.hbase.client.Consistency; +import org.apache.hadoop.hbase.client.Get; +import org.apache.hadoop.hbase.client.RpcRetryingCallerImpl; +import org.apache.hadoop.hbase.client.Table; +import org.apache.hadoop.hbase.replication.regionserver.TestRegionReplicaReplicationEndpoint; +import org.apache.hadoop.hbase.testclassification.LargeTests; +import org.apache.hadoop.hbase.util.Bytes; +import org.apache.hadoop.hbase.util.ServerRegionReplicaUtil; +import org.apache.hadoop.hbase.util.Threads; +import org.apache.hadoop.hbase.util.JVMClusterUtil.RegionServerThread; +import org.apache.log4j.Level; +import org.junit.After; +import org.junit.Before; +import org.junit.Rule; +import org.junit.Test; +import org.junit.experimental.categories.Category; +import org.junit.rules.TestName; +import org.junit.runner.RunWith; +import org.junit.runners.Parameterized; +import org.junit.runners.Parameterized.Parameters; + +/** + * Tests failover of secondary region replicas. + */ +@RunWith(Parameterized.class) +@Category(LargeTests.class) +public class TestRegionReplicaFailover { + + private static final Log LOG = LogFactory.getLog(TestRegionReplicaReplicationEndpoint.class); + + static { + ((Log4JLogger)RpcRetryingCallerImpl.LOG).getLogger().setLevel(Level.ALL); + } + + private static final HBaseTestingUtility HTU = new HBaseTestingUtility(); + + private static final int NB_SERVERS = 3; + + protected final byte[][] families = new byte[][] {HBaseTestingUtility.fam1, + HBaseTestingUtility.fam2, HBaseTestingUtility.fam3}; + protected final byte[] fam = HBaseTestingUtility.fam1; + protected final byte[] qual1 = Bytes.toBytes("qual1"); + protected final byte[] value1 = Bytes.toBytes("value1"); + protected final byte[] row = Bytes.toBytes("rowA"); + protected final byte[] row2 = Bytes.toBytes("rowB"); + + @Rule public TestName name = new TestName(); + + private HTableDescriptor htd; + + /* + * We are testing with dist log split and dist log replay separately + */ + @Parameters + public static Collection getParameters() { + Object[][] params = + new Boolean[][] { {false} }; // TODO: enable dist log replay testing after HBASE-13121 + return Arrays.asList(params); + } + + @Parameterized.Parameter(0) + public boolean distributedLogReplay; + + @Before + public void before() throws Exception { + Configuration conf = HTU.getConfiguration(); + conf.setBoolean(HConstants.REPLICATION_ENABLE_KEY, true); + conf.setBoolean(ServerRegionReplicaUtil.REGION_REPLICA_REPLICATION_CONF_KEY, true); + conf.setBoolean(ServerRegionReplicaUtil.REGION_REPLICA_WAIT_FOR_PRIMARY_FLUSH_CONF_KEY, true); + conf.setInt("replication.stats.thread.period.seconds", 5); + conf.setBoolean("hbase.tests.use.shortcircuit.reads", false); + conf.setBoolean(HConstants.DISTRIBUTED_LOG_REPLAY_KEY, distributedLogReplay); + + HTU.startMiniCluster(NB_SERVERS); + htd = HTU.createTableDescriptor( + name.getMethodName().substring(0, name.getMethodName().length()-3)); + htd.setRegionReplication(3); + HTU.getHBaseAdmin().createTable(htd); + } + + @After + public void after() throws Exception { + HTU.deleteTableIfAny(htd.getTableName()); + HTU.shutdownMiniCluster(); + } + + /** + * Tests the case where a newly created table with region replicas and no data, the secondary + * region replicas are available to read immediately. + */ + @Test(timeout = 60000) + public void testSecondaryRegionWithEmptyRegion() throws IOException { + // Create a new table with region replication, don't put any data. Test that the secondary + // region replica is available to read. + try (Connection connection = ConnectionFactory.createConnection(HTU.getConfiguration()); + Table table = connection.getTable(htd.getTableName())) { + + Get get = new Get(row); + get.setConsistency(Consistency.TIMELINE); + get.setReplicaId(1); + table.get(get); // this should not block + } + } + + /** + * Tests the case where if there is some data in the primary region, reopening the region replicas + * (enable/disable table, etc) makes the region replicas readable. + * @throws IOException + */ + @Test(timeout = 60000) + public void testSecondaryRegionWithNonEmptyRegion() throws IOException { + // Create a new table with region replication and load some data + // than disable and enable the table again and verify the data from secondary + try (Connection connection = ConnectionFactory.createConnection(HTU.getConfiguration()); + Table table = connection.getTable(htd.getTableName())) { + + HTU.loadNumericRows(table, fam, 0, 1000); + + HTU.getHBaseAdmin().disableTable(htd.getTableName()); + HTU.getHBaseAdmin().enableTable(htd.getTableName()); + + HTU.verifyNumericRows(table, fam, 0, 1000, 1); + } + } + + /** + * Tests the case where killing a primary region with unflushed data recovers + */ + @Test (timeout = 120000) + public void testPrimaryRegionKill() throws Exception { + try (Connection connection = ConnectionFactory.createConnection(HTU.getConfiguration()); + Table table = connection.getTable(htd.getTableName())) { + + HTU.loadNumericRows(table, fam, 0, 1000); + + // wal replication is async, we have to wait until the replication catches up, or we timeout + verifyNumericRowsWithTimeout(table, fam, 0, 1000, 1, 30000); + verifyNumericRowsWithTimeout(table, fam, 0, 1000, 2, 30000); + + // we should not have flushed files now, but data in memstores of primary and secondary + // kill the primary region replica now, and ensure that when it comes back up, we can still + // read from it the same data from primary and secondaries + boolean aborted = false; + for (RegionServerThread rs : HTU.getMiniHBaseCluster().getRegionServerThreads()) { + for (HRegion r : rs.getRegionServer().getOnlineRegions(htd.getTableName())) { + if (r.getRegionInfo().getReplicaId() == 0) { + LOG.info("Aborting region server hosting primary region replica"); + rs.getRegionServer().abort("for test"); + aborted = true; + } + } + } + assertTrue(aborted); + + // wal replication is async, we have to wait until the replication catches up, or we timeout + verifyNumericRowsWithTimeout(table, fam, 0, 1000, 0, 30000); + verifyNumericRowsWithTimeout(table, fam, 0, 1000, 1, 30000); + verifyNumericRowsWithTimeout(table, fam, 0, 1000, 2, 30000); + } + + // restart the region server + HTU.getMiniHBaseCluster().startRegionServer(); + } + + /** wal replication is async, we have to wait until the replication catches up, or we timeout + */ + private void verifyNumericRowsWithTimeout(final Table table, final byte[] f, final int startRow, + final int endRow, final int replicaId, final long timeout) throws Exception { + try { + HTU.waitFor(timeout, new Predicate() { + @Override + public boolean evaluate() throws Exception { + try { + HTU.verifyNumericRows(table, f, startRow, endRow, replicaId); + return true; + } catch (AssertionError ae) { + return false; + } + } + }); + } catch (Throwable t) { + // ignore this, but redo the verify do get the actual exception + HTU.verifyNumericRows(table, f, startRow, endRow, replicaId); + } + } + + /** + * Tests the case where killing a secondary region with unflushed data recovers, and the replica + * becomes available to read again shortly. + */ + @Test (timeout = 120000) + public void testSecondaryRegionKill() throws Exception { + try (Connection connection = ConnectionFactory.createConnection(HTU.getConfiguration()); + Table table = connection.getTable(htd.getTableName())) { + HTU.loadNumericRows(table, fam, 0, 1000); + + // wait for some time to ensure that async wal replication does it's magic + verifyNumericRowsWithTimeout(table, fam, 0, 1000, 1, 30000); + verifyNumericRowsWithTimeout(table, fam, 0, 1000, 2, 30000); + + // we should not have flushed files now, but data in memstores of primary and secondary + // kill the secondary region replica now, and ensure that when it comes back up, we can still + // read from it the same data + boolean aborted = false; + for (RegionServerThread rs : HTU.getMiniHBaseCluster().getRegionServerThreads()) { + for (HRegion r : rs.getRegionServer().getOnlineRegions(htd.getTableName())) { + if (r.getRegionInfo().getReplicaId() == 1) { + LOG.info("Aborting region server hosting secondary region replica"); + rs.getRegionServer().abort("for test"); + aborted = true; + } + } + } + assertTrue(aborted); + + Threads.sleep(5000); + + HTU.verifyNumericRows(table, fam, 0, 1000, 1); + HTU.verifyNumericRows(table, fam, 0, 1000, 2); + } + + // restart the region server + HTU.getMiniHBaseCluster().startRegionServer(); + } + + /** + * Tests the case where there are 3 region replicas and the primary is continuously accepting + * new writes while one of the secondaries is killed. Verification is done for both of the + * secondary replicas. + */ + @Test (timeout = 120000) + public void testSecondaryRegionKillWhilePrimaryIsAcceptingWrites() throws Exception { + try (Connection connection = ConnectionFactory.createConnection(HTU.getConfiguration()); + Table table = connection.getTable(htd.getTableName()); + Admin admin = connection.getAdmin()) { + // start a thread to do the loading of primary + HTU.loadNumericRows(table, fam, 0, 1000); // start with some base + admin.flush(table.getName()); + HTU.loadNumericRows(table, fam, 1000, 2000); + + final AtomicReference ex = new AtomicReference(null); + final AtomicBoolean done = new AtomicBoolean(false); + final AtomicInteger key = new AtomicInteger(2000); + + Thread loader = new Thread() { + @Override + public void run() { + while (!done.get()) { + try { + HTU.loadNumericRows(table, fam, key.get(), key.get()+1000); + key.addAndGet(1000); + } catch (Throwable e) { + ex.compareAndSet(null, e); + } + } + } + }; + loader.start(); + + Thread aborter = new Thread() { + @Override + public void run() { + try { + boolean aborted = false; + for (RegionServerThread rs : HTU.getMiniHBaseCluster().getRegionServerThreads()) { + for (HRegion r : rs.getRegionServer().getOnlineRegions(htd.getTableName())) { + if (r.getRegionInfo().getReplicaId() == 1) { + LOG.info("Aborting region server hosting secondary region replica"); + rs.getRegionServer().abort("for test"); + aborted = true; + } + } + } + assertTrue(aborted); + } catch (Throwable e) { + ex.compareAndSet(null, e); + } + }; + }; + + aborter.start(); + aborter.join(); + done.set(true); + loader.join(); + + assertNull(ex.get()); + + assertTrue(key.get() > 1000); // assert that the test is working as designed + LOG.info("Loaded up to key :" + key.get()); + verifyNumericRowsWithTimeout(table, fam, 0, key.get(), 0, 30000); + verifyNumericRowsWithTimeout(table, fam, 0, key.get(), 1, 30000); + verifyNumericRowsWithTimeout(table, fam, 0, key.get(), 2, 30000); + } + + // restart the region server + HTU.getMiniHBaseCluster().startRegionServer(); + } + + /** + * Tests the case where we are creating a table with a lot of regions and replicas. Opening region + * replicas should not block handlers on RS indefinitely. + */ + @Test (timeout = 120000) + public void testLotsOfRegionReplicas() throws IOException { + int numRegions = NB_SERVERS * 20; + int regionReplication = 10; + String tableName = htd.getTableName().getNameAsString() + "2"; + htd = HTU.createTableDescriptor(tableName); + htd.setRegionReplication(regionReplication); + + // dont care about splits themselves too much + byte[] startKey = Bytes.toBytes("aaa"); + byte[] endKey = Bytes.toBytes("zzz"); + byte[][] splits = HTU.getRegionSplitStartKeys(startKey, endKey, numRegions); + HTU.getHBaseAdmin().createTable(htd, startKey, endKey, numRegions); + + try (Connection connection = ConnectionFactory.createConnection(HTU.getConfiguration()); + Table table = connection.getTable(htd.getTableName())) { + + for (int i = 1; i < splits.length; i++) { + for (int j = 0; j < regionReplication; j++) { + Get get = new Get(splits[i]); + get.setConsistency(Consistency.TIMELINE); + get.setReplicaId(j); + table.get(get); // this should not block. Regions should be coming online + } + } + } + + HTU.deleteTableIfAny(TableName.valueOf(tableName)); + } +} diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestWALReplay.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestWALReplay.java index bb634d1b07b..ee76be2eb73 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestWALReplay.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestWALReplay.java @@ -831,11 +831,12 @@ public class TestWALReplay { new HRegion(basedir, newWal, newFS, newConf, hri, htd, null) { @Override protected FlushResult internalFlushcache(final WAL wal, final long myseqid, - Collection storesToFlush, MonitoredTask status) - throws IOException { + final Collection storesToFlush, MonitoredTask status, + boolean writeFlushWalMarker) + throws IOException { LOG.info("InternalFlushCache Invoked"); FlushResult fs = super.internalFlushcache(wal, myseqid, storesToFlush, - Mockito.mock(MonitoredTask.class)); + Mockito.mock(MonitoredTask.class), writeFlushWalMarker); flushcount.incrementAndGet(); return fs; }; diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestRegionReplicaReplicationEndpointNoMaster.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestRegionReplicaReplicationEndpointNoMaster.java index 23263012deb..9170d8d6777 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestRegionReplicaReplicationEndpointNoMaster.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestRegionReplicaReplicationEndpointNoMaster.java @@ -98,6 +98,7 @@ public class TestRegionReplicaReplicationEndpointNoMaster { Configuration conf = HTU.getConfiguration(); conf.setBoolean(HConstants.REPLICATION_ENABLE_KEY, true); conf.setBoolean(ServerRegionReplicaUtil.REGION_REPLICA_REPLICATION_CONF_KEY, true); + conf.setBoolean(ServerRegionReplicaUtil.REGION_REPLICA_WAIT_FOR_PRIMARY_FLUSH_CONF_KEY, false); // install WALObserver coprocessor for tests String walCoprocs = HTU.getConfiguration().get(CoprocessorHost.WAL_COPROCESSOR_CONF_KEY);