HBASE-11580 Failover handling for secondary region replicas

This commit is contained in:
Enis Soztutar 2015-03-03 11:48:12 -08:00
parent ce1b81cdfd
commit 9899aab12b
27 changed files with 1647 additions and 273 deletions

View File

@ -104,14 +104,14 @@ public class ClientSmallScanner extends ClientScanner {
if (endKey == null || Bytes.equals(endKey, HConstants.EMPTY_BYTE_ARRAY)
|| checkScanStopRow(endKey) || done) {
close();
if (LOG.isDebugEnabled()) {
LOG.debug("Finished with small scan at " + this.currentRegion);
if (LOG.isTraceEnabled()) {
LOG.trace("Finished with small scan at " + this.currentRegion);
}
return false;
}
localStartKey = endKey;
if (LOG.isDebugEnabled()) {
LOG.debug("Finished with region " + this.currentRegion);
if (LOG.isTraceEnabled()) {
LOG.trace("Finished with region " + this.currentRegion);
}
} else if (this.lastResult != null) {
localStartKey = this.lastResult.getRow();

View File

@ -94,6 +94,7 @@ public final class ConnectionUtils {
*/
public static void setServerSideHConnectionRetriesConfig(
final Configuration c, final String sn, final Log log) {
// TODO: Fix this. Not all connections from server side should have 10 times the retries.
int hcRetries = c.getInt(HConstants.HBASE_CLIENT_RETRIES_NUMBER,
HConstants.DEFAULT_HBASE_CLIENT_RETRIES_NUMBER);
// Go big. Multiply by 10. If we can't get to meta after this many retries

View File

@ -0,0 +1,102 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hbase.client;
import java.io.IOException;
import org.apache.hadoop.hbase.HRegionInfo;
import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.classification.InterfaceAudience;
import org.apache.hadoop.hbase.ipc.PayloadCarryingRpcController;
import org.apache.hadoop.hbase.ipc.RpcControllerFactory;
import org.apache.hadoop.hbase.protobuf.ProtobufUtil;
import org.apache.hadoop.hbase.protobuf.RequestConverter;
import org.apache.hadoop.hbase.protobuf.generated.AdminProtos.FlushRegionRequest;
import org.apache.hadoop.hbase.protobuf.generated.AdminProtos.FlushRegionResponse;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
import org.mortbay.log.Log;
import com.google.protobuf.ServiceException;
/**
* A Callable for flushRegion() RPC.
*/
@InterfaceAudience.Private
public class FlushRegionCallable extends RegionAdminServiceCallable<FlushRegionResponse> {
private final byte[] regionName;
private final boolean writeFlushWalMarker;
private boolean reload;
public FlushRegionCallable(ClusterConnection connection,
RpcControllerFactory rpcControllerFactory, TableName tableName, byte[] regionName,
byte[] regionStartKey, boolean writeFlushWalMarker) {
super(connection, rpcControllerFactory, tableName, regionStartKey);
this.regionName = regionName;
this.writeFlushWalMarker = writeFlushWalMarker;
}
public FlushRegionCallable(ClusterConnection connection,
RpcControllerFactory rpcControllerFactory, HRegionInfo regionInfo,
boolean writeFlushWalMarker) {
this(connection, rpcControllerFactory, regionInfo.getTable(), regionInfo.getRegionName(),
regionInfo.getStartKey(), writeFlushWalMarker);
}
@Override
public FlushRegionResponse call(int callTimeout) throws Exception {
return flushRegion();
}
@Override
public void prepare(boolean reload) throws IOException {
super.prepare(reload);
this.reload = reload;
}
private FlushRegionResponse flushRegion() throws IOException {
// check whether we should still do the flush to this region. If the regions are changed due
// to splits or merges, etc return success
if (!Bytes.equals(location.getRegionInfo().getRegionName(), regionName)) {
if (!reload) {
throw new IOException("Cached location seems to be different than requested region.");
}
Log.info("Skipping flush region, because the located region "
+ Bytes.toStringBinary(location.getRegionInfo().getRegionName()) + " is different than "
+ " requested region " + Bytes.toStringBinary(regionName));
return FlushRegionResponse.newBuilder()
.setLastFlushTime(EnvironmentEdgeManager.currentTime())
.setFlushed(false)
.setWroteFlushWalMarker(false)
.build();
}
FlushRegionRequest request =
RequestConverter.buildFlushRegionRequest(regionName, writeFlushWalMarker);
try {
PayloadCarryingRpcController controller = rpcControllerFactory.newController();
controller.setPriority(tableName);
return stub.flushRegion(controller, request);
} catch (ServiceException se) {
throw ProtobufUtil.getRemoteException(se);
}
}
}

View File

@ -23,13 +23,17 @@ import java.io.InterruptedIOException;
import java.net.ConnectException;
import java.net.SocketTimeoutException;
import org.apache.hadoop.hbase.DoNotRetryIOException;
import org.apache.hadoop.hbase.HBaseIOException;
import org.apache.hadoop.hbase.HRegionLocation;
import org.apache.hadoop.hbase.NotServingRegionException;
import org.apache.hadoop.hbase.RegionLocations;
import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.classification.InterfaceAudience;
import org.apache.hadoop.hbase.exceptions.RegionMovedException;
import org.apache.hadoop.hbase.ipc.RpcControllerFactory;
import org.apache.hadoop.hbase.protobuf.generated.AdminProtos.AdminService;
import org.apache.hadoop.hbase.util.Bytes;
/**
* Similar to {@link RegionServerCallable} but for the AdminService interface. This service callable
@ -42,25 +46,39 @@ public abstract class RegionAdminServiceCallable<T> implements RetryingCallable<
protected final ClusterConnection connection;
protected final RpcControllerFactory rpcControllerFactory;
protected AdminService.BlockingInterface stub;
protected HRegionLocation location;
protected final TableName tableName;
protected final byte[] row;
protected final int replicaId;
protected final static int MIN_WAIT_DEAD_SERVER = 10000;
public RegionAdminServiceCallable(ClusterConnection connection, TableName tableName, byte[] row) {
this(connection, null, tableName, row);
public RegionAdminServiceCallable(ClusterConnection connection,
RpcControllerFactory rpcControllerFactory, TableName tableName, byte[] row) {
this(connection, rpcControllerFactory, null, tableName, row);
}
public RegionAdminServiceCallable(ClusterConnection connection, HRegionLocation location,
public RegionAdminServiceCallable(ClusterConnection connection,
RpcControllerFactory rpcControllerFactory, HRegionLocation location,
TableName tableName, byte[] row) {
this(connection, rpcControllerFactory, location,
tableName, row, RegionReplicaUtil.DEFAULT_REPLICA_ID);
}
public RegionAdminServiceCallable(ClusterConnection connection,
RpcControllerFactory rpcControllerFactory, HRegionLocation location,
TableName tableName, byte[] row, int replicaId) {
this.connection = connection;
this.rpcControllerFactory = rpcControllerFactory;
this.location = location;
this.tableName = tableName;
this.row = row;
this.replicaId = replicaId;
}
@Override
@ -85,7 +103,18 @@ public abstract class RegionAdminServiceCallable<T> implements RetryingCallable<
this.stub = stub;
}
public abstract HRegionLocation getLocation(boolean useCache) throws IOException;
public HRegionLocation getLocation(boolean useCache) throws IOException {
RegionLocations rl = getRegionLocations(connection, tableName, row, useCache, replicaId);
if (rl == null) {
throw new HBaseIOException(getExceptionMessage());
}
HRegionLocation location = rl.getRegionLocation(replicaId);
if (location == null) {
throw new HBaseIOException(getExceptionMessage());
}
return location;
}
@Override
public void throwable(Throwable t, boolean retrying) {
@ -115,7 +144,8 @@ public abstract class RegionAdminServiceCallable<T> implements RetryingCallable<
//subclasses can override this.
protected String getExceptionMessage() {
return "There is no location";
return "There is no location" + " table=" + tableName
+ " ,replica=" + replicaId + ", row=" + Bytes.toStringBinary(row);
}
@Override
@ -132,4 +162,27 @@ public abstract class RegionAdminServiceCallable<T> implements RetryingCallable<
}
return sleep;
}
public static RegionLocations getRegionLocations(
ClusterConnection connection, TableName tableName, byte[] row,
boolean useCache, int replicaId)
throws RetriesExhaustedException, DoNotRetryIOException, InterruptedIOException {
RegionLocations rl;
try {
rl = connection.locateRegion(tableName, row, useCache, true, replicaId);
} catch (DoNotRetryIOException e) {
throw e;
} catch (RetriesExhaustedException e) {
throw e;
} catch (InterruptedIOException e) {
throw e;
} catch (IOException e) {
throw new RetriesExhaustedException("Can't get the location", e);
}
if (rl == null) {
throw new RetriesExhaustedException("Can't get the locations");
}
return rl;
}
}

View File

@ -37,22 +37,22 @@ public enum EventType {
// Messages originating from RS (NOTE: there is NO direct communication from
// RS to Master). These are a result of RS updates into ZK.
// RS_ZK_REGION_CLOSING (1), // It is replaced by M_ZK_REGION_CLOSING(HBASE-4739)
/**
* RS_ZK_REGION_CLOSED<br>
*
*
* RS has finished closing a region.
*/
RS_ZK_REGION_CLOSED (2, ExecutorType.MASTER_CLOSE_REGION),
/**
* RS_ZK_REGION_OPENING<br>
*
*
* RS is in process of opening a region.
*/
RS_ZK_REGION_OPENING (3, null),
/**
* RS_ZK_REGION_OPENED<br>
*
*
* RS has finished opening a region.
*/
RS_ZK_REGION_OPENED (4, ExecutorType.MASTER_OPEN_REGION),
@ -70,7 +70,7 @@ public enum EventType {
RS_ZK_REGION_SPLIT (6, ExecutorType.MASTER_SERVER_OPERATIONS),
/**
* RS_ZK_REGION_FAILED_OPEN<br>
*
*
* RS failed to open a region.
*/
RS_ZK_REGION_FAILED_OPEN (7, ExecutorType.MASTER_CLOSE_REGION),
@ -217,7 +217,7 @@ public enum EventType {
* Master adds this region as closing in ZK
*/
M_ZK_REGION_CLOSING (51, null),
/**
* Master controlled events to be executed on the master
* M_SERVER_SHUTDOWN
@ -232,14 +232,14 @@ public enum EventType {
M_META_SERVER_SHUTDOWN (72, ExecutorType.MASTER_META_SERVER_OPERATIONS),
/**
* Master controlled events to be executed on the master.<br>
*
*
* M_MASTER_RECOVERY<br>
* Master is processing recovery of regions found in ZK RIT
*/
M_MASTER_RECOVERY (73, ExecutorType.MASTER_SERVER_OPERATIONS),
/**
* Master controlled events to be executed on the master.<br>
*
*
* M_LOG_REPLAY<br>
* Master is processing log replay of failed region server
*/
@ -247,18 +247,25 @@ public enum EventType {
/**
* RS controlled events to be executed on the RS.<br>
*
*
* RS_PARALLEL_SEEK
*/
RS_PARALLEL_SEEK (80, ExecutorType.RS_PARALLEL_SEEK),
/**
* RS wal recovery work items(either creating recover.edits or directly replay wals)
* to be executed on the RS.<br>
*
*
* RS_LOG_REPLAY
*/
RS_LOG_REPLAY (81, ExecutorType.RS_LOG_REPLAY_OPS);
RS_LOG_REPLAY (81, ExecutorType.RS_LOG_REPLAY_OPS),
/**
* RS flush triggering from secondary region replicas to primary region replica. <br>
*
* RS_REGION_REPLICA_FLUSH
*/
RS_REGION_REPLICA_FLUSH (82, ExecutorType.RS_REGION_REPLICA_FLUSH_OPS);
private final int code;
private final ExecutorType executor;

View File

@ -45,7 +45,8 @@ public enum ExecutorType {
RS_CLOSE_ROOT (24),
RS_CLOSE_META (25),
RS_PARALLEL_SEEK (26),
RS_LOG_REPLAY_OPS (27);
RS_LOG_REPLAY_OPS (27),
RS_REGION_REPLICA_FLUSH_OPS (28);
ExecutorType(int value) {}

View File

@ -739,10 +739,22 @@ public final class RequestConverter {
*/
public static FlushRegionRequest
buildFlushRegionRequest(final byte[] regionName) {
return buildFlushRegionRequest(regionName, false);
}
/**
* Create a protocol buffer FlushRegionRequest for a given region name
*
* @param regionName the name of the region to get info
* @return a protocol buffer FlushRegionRequest
*/
public static FlushRegionRequest
buildFlushRegionRequest(final byte[] regionName, boolean writeFlushWALMarker) {
FlushRegionRequest.Builder builder = FlushRegionRequest.newBuilder();
RegionSpecifier region = buildRegionSpecifier(
RegionSpecifierType.REGION_NAME, regionName);
builder.setRegion(region);
builder.setWriteFlushWalMarker(writeFlushWALMarker);
return builder.build();
}

View File

@ -152,7 +152,9 @@ public class RetryCounter {
public void sleepUntilNextRetry() throws InterruptedException {
int attempts = getAttemptTimes();
long sleepTime = retryConfig.backoffPolicy.getBackoffTime(retryConfig, attempts);
LOG.info("Sleeping " + sleepTime + "ms before retry #" + attempts + "...");
if (LOG.isTraceEnabled()) {
LOG.trace("Sleeping " + sleepTime + "ms before retry #" + attempts + "...");
}
retryConfig.getTimeUnit().sleep(sleepTime);
useRetry();
}

View File

@ -112,11 +112,6 @@ public class IntegrationTestRegionReplicaReplication extends IntegrationTestInge
runIngestTest(JUNIT_RUN_TIME, 25000, 10, 1024, 10, 20);
}
@Override
protected void startMonkey() throws Exception {
// TODO: disabled for now
}
/**
* This extends MultiThreadedWriter to add a configurable delay to the keys written by the writer
* threads to become available to the MultiThradedReader threads. We add this delay because of

View File

@ -54,7 +54,7 @@ public class RemoveColumnAction extends Action {
HTableDescriptor tableDescriptor = admin.getTableDescriptor(tableName);
HColumnDescriptor[] columnDescriptors = tableDescriptor.getColumnFamilies();
if (columnDescriptors.length <= 1) {
if (columnDescriptors.length <= (protectedColumns == null ? 1 : protectedColumns.size())) {
return;
}

View File

@ -7996,6 +7996,24 @@ public final class AdminProtos {
* <code>optional uint64 if_older_than_ts = 2;</code>
*/
long getIfOlderThanTs();
// optional bool write_flush_wal_marker = 3;
/**
* <code>optional bool write_flush_wal_marker = 3;</code>
*
* <pre>
* whether to write a marker to WAL even if not flushed
* </pre>
*/
boolean hasWriteFlushWalMarker();
/**
* <code>optional bool write_flush_wal_marker = 3;</code>
*
* <pre>
* whether to write a marker to WAL even if not flushed
* </pre>
*/
boolean getWriteFlushWalMarker();
}
/**
* Protobuf type {@code FlushRegionRequest}
@ -8073,6 +8091,11 @@ public final class AdminProtos {
ifOlderThanTs_ = input.readUInt64();
break;
}
case 24: {
bitField0_ |= 0x00000004;
writeFlushWalMarker_ = input.readBool();
break;
}
}
}
} catch (com.google.protobuf.InvalidProtocolBufferException e) {
@ -8151,9 +8174,34 @@ public final class AdminProtos {
return ifOlderThanTs_;
}
// optional bool write_flush_wal_marker = 3;
public static final int WRITE_FLUSH_WAL_MARKER_FIELD_NUMBER = 3;
private boolean writeFlushWalMarker_;
/**
* <code>optional bool write_flush_wal_marker = 3;</code>
*
* <pre>
* whether to write a marker to WAL even if not flushed
* </pre>
*/
public boolean hasWriteFlushWalMarker() {
return ((bitField0_ & 0x00000004) == 0x00000004);
}
/**
* <code>optional bool write_flush_wal_marker = 3;</code>
*
* <pre>
* whether to write a marker to WAL even if not flushed
* </pre>
*/
public boolean getWriteFlushWalMarker() {
return writeFlushWalMarker_;
}
private void initFields() {
region_ = org.apache.hadoop.hbase.protobuf.generated.HBaseProtos.RegionSpecifier.getDefaultInstance();
ifOlderThanTs_ = 0L;
writeFlushWalMarker_ = false;
}
private byte memoizedIsInitialized = -1;
public final boolean isInitialized() {
@ -8181,6 +8229,9 @@ public final class AdminProtos {
if (((bitField0_ & 0x00000002) == 0x00000002)) {
output.writeUInt64(2, ifOlderThanTs_);
}
if (((bitField0_ & 0x00000004) == 0x00000004)) {
output.writeBool(3, writeFlushWalMarker_);
}
getUnknownFields().writeTo(output);
}
@ -8198,6 +8249,10 @@ public final class AdminProtos {
size += com.google.protobuf.CodedOutputStream
.computeUInt64Size(2, ifOlderThanTs_);
}
if (((bitField0_ & 0x00000004) == 0x00000004)) {
size += com.google.protobuf.CodedOutputStream
.computeBoolSize(3, writeFlushWalMarker_);
}
size += getUnknownFields().getSerializedSize();
memoizedSerializedSize = size;
return size;
@ -8231,6 +8286,11 @@ public final class AdminProtos {
result = result && (getIfOlderThanTs()
== other.getIfOlderThanTs());
}
result = result && (hasWriteFlushWalMarker() == other.hasWriteFlushWalMarker());
if (hasWriteFlushWalMarker()) {
result = result && (getWriteFlushWalMarker()
== other.getWriteFlushWalMarker());
}
result = result &&
getUnknownFields().equals(other.getUnknownFields());
return result;
@ -8252,6 +8312,10 @@ public final class AdminProtos {
hash = (37 * hash) + IF_OLDER_THAN_TS_FIELD_NUMBER;
hash = (53 * hash) + hashLong(getIfOlderThanTs());
}
if (hasWriteFlushWalMarker()) {
hash = (37 * hash) + WRITE_FLUSH_WAL_MARKER_FIELD_NUMBER;
hash = (53 * hash) + hashBoolean(getWriteFlushWalMarker());
}
hash = (29 * hash) + getUnknownFields().hashCode();
memoizedHashCode = hash;
return hash;
@ -8377,6 +8441,8 @@ public final class AdminProtos {
bitField0_ = (bitField0_ & ~0x00000001);
ifOlderThanTs_ = 0L;
bitField0_ = (bitField0_ & ~0x00000002);
writeFlushWalMarker_ = false;
bitField0_ = (bitField0_ & ~0x00000004);
return this;
}
@ -8417,6 +8483,10 @@ public final class AdminProtos {
to_bitField0_ |= 0x00000002;
}
result.ifOlderThanTs_ = ifOlderThanTs_;
if (((from_bitField0_ & 0x00000004) == 0x00000004)) {
to_bitField0_ |= 0x00000004;
}
result.writeFlushWalMarker_ = writeFlushWalMarker_;
result.bitField0_ = to_bitField0_;
onBuilt();
return result;
@ -8439,6 +8509,9 @@ public final class AdminProtos {
if (other.hasIfOlderThanTs()) {
setIfOlderThanTs(other.getIfOlderThanTs());
}
if (other.hasWriteFlushWalMarker()) {
setWriteFlushWalMarker(other.getWriteFlushWalMarker());
}
this.mergeUnknownFields(other.getUnknownFields());
return this;
}
@ -8624,6 +8697,55 @@ public final class AdminProtos {
return this;
}
// optional bool write_flush_wal_marker = 3;
private boolean writeFlushWalMarker_ ;
/**
* <code>optional bool write_flush_wal_marker = 3;</code>
*
* <pre>
* whether to write a marker to WAL even if not flushed
* </pre>
*/
public boolean hasWriteFlushWalMarker() {
return ((bitField0_ & 0x00000004) == 0x00000004);
}
/**
* <code>optional bool write_flush_wal_marker = 3;</code>
*
* <pre>
* whether to write a marker to WAL even if not flushed
* </pre>
*/
public boolean getWriteFlushWalMarker() {
return writeFlushWalMarker_;
}
/**
* <code>optional bool write_flush_wal_marker = 3;</code>
*
* <pre>
* whether to write a marker to WAL even if not flushed
* </pre>
*/
public Builder setWriteFlushWalMarker(boolean value) {
bitField0_ |= 0x00000004;
writeFlushWalMarker_ = value;
onChanged();
return this;
}
/**
* <code>optional bool write_flush_wal_marker = 3;</code>
*
* <pre>
* whether to write a marker to WAL even if not flushed
* </pre>
*/
public Builder clearWriteFlushWalMarker() {
bitField0_ = (bitField0_ & ~0x00000004);
writeFlushWalMarker_ = false;
onChanged();
return this;
}
// @@protoc_insertion_point(builder_scope:FlushRegionRequest)
}
@ -8657,6 +8779,16 @@ public final class AdminProtos {
* <code>optional bool flushed = 2;</code>
*/
boolean getFlushed();
// optional bool wrote_flush_wal_marker = 3;
/**
* <code>optional bool wrote_flush_wal_marker = 3;</code>
*/
boolean hasWroteFlushWalMarker();
/**
* <code>optional bool wrote_flush_wal_marker = 3;</code>
*/
boolean getWroteFlushWalMarker();
}
/**
* Protobuf type {@code FlushRegionResponse}
@ -8719,6 +8851,11 @@ public final class AdminProtos {
flushed_ = input.readBool();
break;
}
case 24: {
bitField0_ |= 0x00000004;
wroteFlushWalMarker_ = input.readBool();
break;
}
}
}
} catch (com.google.protobuf.InvalidProtocolBufferException e) {
@ -8791,9 +8928,26 @@ public final class AdminProtos {
return flushed_;
}
// optional bool wrote_flush_wal_marker = 3;
public static final int WROTE_FLUSH_WAL_MARKER_FIELD_NUMBER = 3;
private boolean wroteFlushWalMarker_;
/**
* <code>optional bool wrote_flush_wal_marker = 3;</code>
*/
public boolean hasWroteFlushWalMarker() {
return ((bitField0_ & 0x00000004) == 0x00000004);
}
/**
* <code>optional bool wrote_flush_wal_marker = 3;</code>
*/
public boolean getWroteFlushWalMarker() {
return wroteFlushWalMarker_;
}
private void initFields() {
lastFlushTime_ = 0L;
flushed_ = false;
wroteFlushWalMarker_ = false;
}
private byte memoizedIsInitialized = -1;
public final boolean isInitialized() {
@ -8817,6 +8971,9 @@ public final class AdminProtos {
if (((bitField0_ & 0x00000002) == 0x00000002)) {
output.writeBool(2, flushed_);
}
if (((bitField0_ & 0x00000004) == 0x00000004)) {
output.writeBool(3, wroteFlushWalMarker_);
}
getUnknownFields().writeTo(output);
}
@ -8834,6 +8991,10 @@ public final class AdminProtos {
size += com.google.protobuf.CodedOutputStream
.computeBoolSize(2, flushed_);
}
if (((bitField0_ & 0x00000004) == 0x00000004)) {
size += com.google.protobuf.CodedOutputStream
.computeBoolSize(3, wroteFlushWalMarker_);
}
size += getUnknownFields().getSerializedSize();
memoizedSerializedSize = size;
return size;
@ -8867,6 +9028,11 @@ public final class AdminProtos {
result = result && (getFlushed()
== other.getFlushed());
}
result = result && (hasWroteFlushWalMarker() == other.hasWroteFlushWalMarker());
if (hasWroteFlushWalMarker()) {
result = result && (getWroteFlushWalMarker()
== other.getWroteFlushWalMarker());
}
result = result &&
getUnknownFields().equals(other.getUnknownFields());
return result;
@ -8888,6 +9054,10 @@ public final class AdminProtos {
hash = (37 * hash) + FLUSHED_FIELD_NUMBER;
hash = (53 * hash) + hashBoolean(getFlushed());
}
if (hasWroteFlushWalMarker()) {
hash = (37 * hash) + WROTE_FLUSH_WAL_MARKER_FIELD_NUMBER;
hash = (53 * hash) + hashBoolean(getWroteFlushWalMarker());
}
hash = (29 * hash) + getUnknownFields().hashCode();
memoizedHashCode = hash;
return hash;
@ -9001,6 +9171,8 @@ public final class AdminProtos {
bitField0_ = (bitField0_ & ~0x00000001);
flushed_ = false;
bitField0_ = (bitField0_ & ~0x00000002);
wroteFlushWalMarker_ = false;
bitField0_ = (bitField0_ & ~0x00000004);
return this;
}
@ -9037,6 +9209,10 @@ public final class AdminProtos {
to_bitField0_ |= 0x00000002;
}
result.flushed_ = flushed_;
if (((from_bitField0_ & 0x00000004) == 0x00000004)) {
to_bitField0_ |= 0x00000004;
}
result.wroteFlushWalMarker_ = wroteFlushWalMarker_;
result.bitField0_ = to_bitField0_;
onBuilt();
return result;
@ -9059,6 +9235,9 @@ public final class AdminProtos {
if (other.hasFlushed()) {
setFlushed(other.getFlushed());
}
if (other.hasWroteFlushWalMarker()) {
setWroteFlushWalMarker(other.getWroteFlushWalMarker());
}
this.mergeUnknownFields(other.getUnknownFields());
return this;
}
@ -9156,6 +9335,39 @@ public final class AdminProtos {
return this;
}
// optional bool wrote_flush_wal_marker = 3;
private boolean wroteFlushWalMarker_ ;
/**
* <code>optional bool wrote_flush_wal_marker = 3;</code>
*/
public boolean hasWroteFlushWalMarker() {
return ((bitField0_ & 0x00000004) == 0x00000004);
}
/**
* <code>optional bool wrote_flush_wal_marker = 3;</code>
*/
public boolean getWroteFlushWalMarker() {
return wroteFlushWalMarker_;
}
/**
* <code>optional bool wrote_flush_wal_marker = 3;</code>
*/
public Builder setWroteFlushWalMarker(boolean value) {
bitField0_ |= 0x00000004;
wroteFlushWalMarker_ = value;
onChanged();
return this;
}
/**
* <code>optional bool wrote_flush_wal_marker = 3;</code>
*/
public Builder clearWroteFlushWalMarker() {
bitField0_ = (bitField0_ & ~0x00000004);
wroteFlushWalMarker_ = false;
onChanged();
return this;
}
// @@protoc_insertion_point(builder_scope:FlushRegionResponse)
}
@ -22073,66 +22285,67 @@ public final class AdminProtos {
"n_ZK\030\003 \001(\010:\004true\022\'\n\022destination_server\030\004" +
" \001(\0132\013.ServerName\022\027\n\017serverStartCode\030\005 \001" +
"(\004\"%\n\023CloseRegionResponse\022\016\n\006closed\030\001 \002(",
"\010\"P\n\022FlushRegionRequest\022 \n\006region\030\001 \002(\0132" +
"\010\"p\n\022FlushRegionRequest\022 \n\006region\030\001 \002(\0132" +
"\020.RegionSpecifier\022\030\n\020if_older_than_ts\030\002 " +
"\001(\004\"?\n\023FlushRegionResponse\022\027\n\017last_flush" +
"_time\030\001 \002(\004\022\017\n\007flushed\030\002 \001(\010\"K\n\022SplitReg" +
"ionRequest\022 \n\006region\030\001 \002(\0132\020.RegionSpeci" +
"fier\022\023\n\013split_point\030\002 \001(\014\"\025\n\023SplitRegion" +
"Response\"W\n\024CompactRegionRequest\022 \n\006regi" +
"on\030\001 \002(\0132\020.RegionSpecifier\022\r\n\005major\030\002 \001(" +
"\010\022\016\n\006family\030\003 \001(\014\"\027\n\025CompactRegionRespon" +
"se\"\262\001\n\031UpdateFavoredNodesRequest\022@\n\013upda",
"te_info\030\001 \003(\0132+.UpdateFavoredNodesReques" +
"t.RegionUpdateInfo\032S\n\020RegionUpdateInfo\022\033" +
"\n\006region\030\001 \002(\0132\013.RegionInfo\022\"\n\rfavored_n" +
"odes\030\002 \003(\0132\013.ServerName\".\n\032UpdateFavored" +
"NodesResponse\022\020\n\010response\030\001 \001(\r\"v\n\023Merge" +
"RegionsRequest\022\"\n\010region_a\030\001 \002(\0132\020.Regio" +
"nSpecifier\022\"\n\010region_b\030\002 \002(\0132\020.RegionSpe" +
"cifier\022\027\n\010forcible\030\003 \001(\010:\005false\"\026\n\024Merge" +
"RegionsResponse\"X\n\010WALEntry\022\024\n\003key\030\001 \002(\013" +
"2\007.WALKey\022\027\n\017key_value_bytes\030\002 \003(\014\022\035\n\025as",
"sociated_cell_count\030\003 \001(\005\"4\n\030ReplicateWA" +
"LEntryRequest\022\030\n\005entry\030\001 \003(\0132\t.WALEntry\"" +
"\033\n\031ReplicateWALEntryResponse\"\026\n\024RollWALW" +
"riterRequest\"0\n\025RollWALWriterResponse\022\027\n" +
"\017region_to_flush\030\001 \003(\014\"#\n\021StopServerRequ" +
"est\022\016\n\006reason\030\001 \002(\t\"\024\n\022StopServerRespons" +
"e\"\026\n\024GetServerInfoRequest\"B\n\nServerInfo\022" +
" \n\013server_name\030\001 \002(\0132\013.ServerName\022\022\n\nweb" +
"ui_port\030\002 \001(\r\"9\n\025GetServerInfoResponse\022 " +
"\n\013server_info\030\001 \002(\0132\013.ServerInfo\"\034\n\032Upda",
"teConfigurationRequest\"\035\n\033UpdateConfigur" +
"ationResponse2\230\010\n\014AdminService\022>\n\rGetReg" +
"ionInfo\022\025.GetRegionInfoRequest\032\026.GetRegi" +
"onInfoResponse\022;\n\014GetStoreFile\022\024.GetStor" +
"eFileRequest\032\025.GetStoreFileResponse\022D\n\017G" +
"etOnlineRegion\022\027.GetOnlineRegionRequest\032" +
"\030.GetOnlineRegionResponse\0225\n\nOpenRegion\022" +
"\022.OpenRegionRequest\032\023.OpenRegionResponse" +
"\0228\n\013CloseRegion\022\023.CloseRegionRequest\032\024.C" +
"loseRegionResponse\0228\n\013FlushRegion\022\023.Flus",
"hRegionRequest\032\024.FlushRegionResponse\0228\n\013" +
"SplitRegion\022\023.SplitRegionRequest\032\024.Split" +
"RegionResponse\022>\n\rCompactRegion\022\025.Compac" +
"tRegionRequest\032\026.CompactRegionResponse\022;" +
"\n\014MergeRegions\022\024.MergeRegionsRequest\032\025.M" +
"ergeRegionsResponse\022J\n\021ReplicateWALEntry" +
"\022\031.ReplicateWALEntryRequest\032\032.ReplicateW" +
"ALEntryResponse\022?\n\006Replay\022\031.ReplicateWAL" +
"EntryRequest\032\032.ReplicateWALEntryResponse" +
"\022>\n\rRollWALWriter\022\025.RollWALWriterRequest",
"\032\026.RollWALWriterResponse\022>\n\rGetServerInf" +
"o\022\025.GetServerInfoRequest\032\026.GetServerInfo" +
"Response\0225\n\nStopServer\022\022.StopServerReque" +
"st\032\023.StopServerResponse\022M\n\022UpdateFavored" +
"Nodes\022\032.UpdateFavoredNodesRequest\032\033.Upda" +
"teFavoredNodesResponse\022P\n\023UpdateConfigur" +
"ation\022\033.UpdateConfigurationRequest\032\034.Upd" +
"ateConfigurationResponseBA\n*org.apache.h" +
"adoop.hbase.protobuf.generatedB\013AdminPro" +
"tosH\001\210\001\001\240\001\001"
"\001(\004\022\036\n\026write_flush_wal_marker\030\003 \001(\010\"_\n\023F" +
"lushRegionResponse\022\027\n\017last_flush_time\030\001 " +
"\002(\004\022\017\n\007flushed\030\002 \001(\010\022\036\n\026wrote_flush_wal_" +
"marker\030\003 \001(\010\"K\n\022SplitRegionRequest\022 \n\006re" +
"gion\030\001 \002(\0132\020.RegionSpecifier\022\023\n\013split_po" +
"int\030\002 \001(\014\"\025\n\023SplitRegionResponse\"W\n\024Comp" +
"actRegionRequest\022 \n\006region\030\001 \002(\0132\020.Regio" +
"nSpecifier\022\r\n\005major\030\002 \001(\010\022\016\n\006family\030\003 \001(",
"\014\"\027\n\025CompactRegionResponse\"\262\001\n\031UpdateFav" +
"oredNodesRequest\022@\n\013update_info\030\001 \003(\0132+." +
"UpdateFavoredNodesRequest.RegionUpdateIn" +
"fo\032S\n\020RegionUpdateInfo\022\033\n\006region\030\001 \002(\0132\013" +
".RegionInfo\022\"\n\rfavored_nodes\030\002 \003(\0132\013.Ser" +
"verName\".\n\032UpdateFavoredNodesResponse\022\020\n" +
"\010response\030\001 \001(\r\"v\n\023MergeRegionsRequest\022\"" +
"\n\010region_a\030\001 \002(\0132\020.RegionSpecifier\022\"\n\010re" +
"gion_b\030\002 \002(\0132\020.RegionSpecifier\022\027\n\010forcib" +
"le\030\003 \001(\010:\005false\"\026\n\024MergeRegionsResponse\"",
"X\n\010WALEntry\022\024\n\003key\030\001 \002(\0132\007.WALKey\022\027\n\017key" +
"_value_bytes\030\002 \003(\014\022\035\n\025associated_cell_co" +
"unt\030\003 \001(\005\"4\n\030ReplicateWALEntryRequest\022\030\n" +
"\005entry\030\001 \003(\0132\t.WALEntry\"\033\n\031ReplicateWALE" +
"ntryResponse\"\026\n\024RollWALWriterRequest\"0\n\025" +
"RollWALWriterResponse\022\027\n\017region_to_flush" +
"\030\001 \003(\014\"#\n\021StopServerRequest\022\016\n\006reason\030\001 " +
"\002(\t\"\024\n\022StopServerResponse\"\026\n\024GetServerIn" +
"foRequest\"B\n\nServerInfo\022 \n\013server_name\030\001" +
" \002(\0132\013.ServerName\022\022\n\nwebui_port\030\002 \001(\r\"9\n",
"\025GetServerInfoResponse\022 \n\013server_info\030\001 " +
"\002(\0132\013.ServerInfo\"\034\n\032UpdateConfigurationR" +
"equest\"\035\n\033UpdateConfigurationResponse2\230\010" +
"\n\014AdminService\022>\n\rGetRegionInfo\022\025.GetReg" +
"ionInfoRequest\032\026.GetRegionInfoResponse\022;" +
"\n\014GetStoreFile\022\024.GetStoreFileRequest\032\025.G" +
"etStoreFileResponse\022D\n\017GetOnlineRegion\022\027" +
".GetOnlineRegionRequest\032\030.GetOnlineRegio" +
"nResponse\0225\n\nOpenRegion\022\022.OpenRegionRequ" +
"est\032\023.OpenRegionResponse\0228\n\013CloseRegion\022",
"\023.CloseRegionRequest\032\024.CloseRegionRespon" +
"se\0228\n\013FlushRegion\022\023.FlushRegionRequest\032\024" +
".FlushRegionResponse\0228\n\013SplitRegion\022\023.Sp" +
"litRegionRequest\032\024.SplitRegionResponse\022>" +
"\n\rCompactRegion\022\025.CompactRegionRequest\032\026" +
".CompactRegionResponse\022;\n\014MergeRegions\022\024" +
".MergeRegionsRequest\032\025.MergeRegionsRespo" +
"nse\022J\n\021ReplicateWALEntry\022\031.ReplicateWALE" +
"ntryRequest\032\032.ReplicateWALEntryResponse\022" +
"?\n\006Replay\022\031.ReplicateWALEntryRequest\032\032.R",
"eplicateWALEntryResponse\022>\n\rRollWALWrite" +
"r\022\025.RollWALWriterRequest\032\026.RollWALWriter" +
"Response\022>\n\rGetServerInfo\022\025.GetServerInf" +
"oRequest\032\026.GetServerInfoResponse\0225\n\nStop" +
"Server\022\022.StopServerRequest\032\023.StopServerR" +
"esponse\022M\n\022UpdateFavoredNodes\022\032.UpdateFa" +
"voredNodesRequest\032\033.UpdateFavoredNodesRe" +
"sponse\022P\n\023UpdateConfiguration\022\033.UpdateCo" +
"nfigurationRequest\032\034.UpdateConfiguration" +
"ResponseBA\n*org.apache.hadoop.hbase.prot",
"obuf.generatedB\013AdminProtosH\001\210\001\001\240\001\001"
};
com.google.protobuf.Descriptors.FileDescriptor.InternalDescriptorAssigner assigner =
new com.google.protobuf.Descriptors.FileDescriptor.InternalDescriptorAssigner() {
@ -22210,13 +22423,13 @@ public final class AdminProtos {
internal_static_FlushRegionRequest_fieldAccessorTable = new
com.google.protobuf.GeneratedMessage.FieldAccessorTable(
internal_static_FlushRegionRequest_descriptor,
new java.lang.String[] { "Region", "IfOlderThanTs", });
new java.lang.String[] { "Region", "IfOlderThanTs", "WriteFlushWalMarker", });
internal_static_FlushRegionResponse_descriptor =
getDescriptor().getMessageTypes().get(11);
internal_static_FlushRegionResponse_fieldAccessorTable = new
com.google.protobuf.GeneratedMessage.FieldAccessorTable(
internal_static_FlushRegionResponse_descriptor,
new java.lang.String[] { "LastFlushTime", "Flushed", });
new java.lang.String[] { "LastFlushTime", "Flushed", "WroteFlushWalMarker", });
internal_static_SplitRegionRequest_descriptor =
getDescriptor().getMessageTypes().get(12);
internal_static_SplitRegionRequest_fieldAccessorTable = new

View File

@ -5695,6 +5695,14 @@ public final class WALProtos {
* <code>ABORT_FLUSH = 2;</code>
*/
ABORT_FLUSH(2, 2),
/**
* <code>CANNOT_FLUSH = 3;</code>
*
* <pre>
* marker for indicating that a flush has been requested but cannot complete
* </pre>
*/
CANNOT_FLUSH(3, 3),
;
/**
@ -5709,6 +5717,14 @@ public final class WALProtos {
* <code>ABORT_FLUSH = 2;</code>
*/
public static final int ABORT_FLUSH_VALUE = 2;
/**
* <code>CANNOT_FLUSH = 3;</code>
*
* <pre>
* marker for indicating that a flush has been requested but cannot complete
* </pre>
*/
public static final int CANNOT_FLUSH_VALUE = 3;
public final int getNumber() { return value; }
@ -5718,6 +5734,7 @@ public final class WALProtos {
case 0: return START_FLUSH;
case 1: return COMMIT_FLUSH;
case 2: return ABORT_FLUSH;
case 3: return CANNOT_FLUSH;
default: return null;
}
}
@ -11848,7 +11865,7 @@ public final class WALProtos {
"n_name\030\002 \002(\014\022\023\n\013family_name\030\003 \002(\014\022\030\n\020com" +
"paction_input\030\004 \003(\t\022\031\n\021compaction_output" +
"\030\005 \003(\t\022\026\n\016store_home_dir\030\006 \002(\t\022\023\n\013region" +
"_name\030\007 \001(\014\"\200\003\n\017FlushDescriptor\022,\n\006actio" +
"_name\030\007 \001(\014\"\222\003\n\017FlushDescriptor\022,\n\006actio" +
"n\030\001 \002(\0162\034.FlushDescriptor.FlushAction\022\022\n",
"\ntable_name\030\002 \002(\014\022\033\n\023encoded_region_name" +
"\030\003 \002(\014\022\035\n\025flush_sequence_number\030\004 \001(\004\022<\n" +
@ -11856,25 +11873,26 @@ public final class WALProtos {
"toreFlushDescriptor\022\023\n\013region_name\030\006 \001(\014" +
"\032Y\n\024StoreFlushDescriptor\022\023\n\013family_name\030" +
"\001 \002(\014\022\026\n\016store_home_dir\030\002 \002(\t\022\024\n\014flush_o" +
"utput\030\003 \003(\t\"A\n\013FlushAction\022\017\n\013START_FLUS" +
"H\020\000\022\020\n\014COMMIT_FLUSH\020\001\022\017\n\013ABORT_FLUSH\020\002\"R" +
"\n\017StoreDescriptor\022\023\n\013family_name\030\001 \002(\014\022\026" +
"\n\016store_home_dir\030\002 \002(\t\022\022\n\nstore_file\030\003 \003",
"(\t\"\215\001\n\022BulkLoadDescriptor\022\036\n\ntable_name\030" +
"\001 \002(\0132\n.TableName\022\033\n\023encoded_region_name" +
"\030\002 \002(\014\022 \n\006stores\030\003 \003(\0132\020.StoreDescriptor" +
"\022\030\n\020bulkload_seq_num\030\004 \002(\003\"\237\002\n\025RegionEve" +
"ntDescriptor\0224\n\nevent_type\030\001 \002(\0162 .Regio" +
"nEventDescriptor.EventType\022\022\n\ntable_name" +
"\030\002 \002(\014\022\033\n\023encoded_region_name\030\003 \002(\014\022\033\n\023l" +
"og_sequence_number\030\004 \001(\004\022 \n\006stores\030\005 \003(\013" +
"2\020.StoreDescriptor\022\033\n\006server\030\006 \001(\0132\013.Ser" +
"verName\022\023\n\013region_name\030\007 \001(\014\".\n\tEventTyp",
"e\022\017\n\013REGION_OPEN\020\000\022\020\n\014REGION_CLOSE\020\001\"\014\n\n" +
"WALTrailer*F\n\tScopeType\022\033\n\027REPLICATION_S" +
"COPE_LOCAL\020\000\022\034\n\030REPLICATION_SCOPE_GLOBAL" +
"\020\001B?\n*org.apache.hadoop.hbase.protobuf.g" +
"eneratedB\tWALProtosH\001\210\001\000\240\001\001"
"utput\030\003 \003(\t\"S\n\013FlushAction\022\017\n\013START_FLUS" +
"H\020\000\022\020\n\014COMMIT_FLUSH\020\001\022\017\n\013ABORT_FLUSH\020\002\022\020" +
"\n\014CANNOT_FLUSH\020\003\"R\n\017StoreDescriptor\022\023\n\013f" +
"amily_name\030\001 \002(\014\022\026\n\016store_home_dir\030\002 \002(\t",
"\022\022\n\nstore_file\030\003 \003(\t\"\215\001\n\022BulkLoadDescrip" +
"tor\022\036\n\ntable_name\030\001 \002(\0132\n.TableName\022\033\n\023e" +
"ncoded_region_name\030\002 \002(\014\022 \n\006stores\030\003 \003(\013" +
"2\020.StoreDescriptor\022\030\n\020bulkload_seq_num\030\004" +
" \002(\003\"\237\002\n\025RegionEventDescriptor\0224\n\nevent_" +
"type\030\001 \002(\0162 .RegionEventDescriptor.Event" +
"Type\022\022\n\ntable_name\030\002 \002(\014\022\033\n\023encoded_regi" +
"on_name\030\003 \002(\014\022\033\n\023log_sequence_number\030\004 \001" +
"(\004\022 \n\006stores\030\005 \003(\0132\020.StoreDescriptor\022\033\n\006" +
"server\030\006 \001(\0132\013.ServerName\022\023\n\013region_name",
"\030\007 \001(\014\".\n\tEventType\022\017\n\013REGION_OPEN\020\000\022\020\n\014" +
"REGION_CLOSE\020\001\"\014\n\nWALTrailer*F\n\tScopeTyp" +
"e\022\033\n\027REPLICATION_SCOPE_LOCAL\020\000\022\034\n\030REPLIC" +
"ATION_SCOPE_GLOBAL\020\001B?\n*org.apache.hadoo" +
"p.hbase.protobuf.generatedB\tWALProtosH\001\210" +
"\001\000\240\001\001"
};
com.google.protobuf.Descriptors.FileDescriptor.InternalDescriptorAssigner assigner =
new com.google.protobuf.Descriptors.FileDescriptor.InternalDescriptorAssigner() {

View File

@ -115,11 +115,13 @@ message CloseRegionResponse {
message FlushRegionRequest {
required RegionSpecifier region = 1;
optional uint64 if_older_than_ts = 2;
optional bool write_flush_wal_marker = 3; // whether to write a marker to WAL even if not flushed
}
message FlushRegionResponse {
required uint64 last_flush_time = 1;
optional bool flushed = 2;
optional bool wrote_flush_wal_marker = 3;
}
/**

View File

@ -109,6 +109,7 @@ message FlushDescriptor {
START_FLUSH = 0;
COMMIT_FLUSH = 1;
ABORT_FLUSH = 2;
CANNOT_FLUSH = 3; // marker for indicating that a flush has been requested but cannot complete
}
message StoreFlushDescriptor {

View File

@ -480,6 +480,7 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver { //
final Result result;
final String failureReason;
final long flushSequenceId;
final boolean wroteFlushWalMarker;
/**
* Convenience constructor to use when the flush is successful, the failure message is set to
@ -489,7 +490,7 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver { //
* memstores.
*/
FlushResult(Result result, long flushSequenceId) {
this(result, flushSequenceId, null);
this(result, flushSequenceId, null, false);
assert result == Result.FLUSHED_NO_COMPACTION_NEEDED || result == Result
.FLUSHED_COMPACTION_NEEDED;
}
@ -499,8 +500,8 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver { //
* @param result Expecting CANNOT_FLUSH_MEMSTORE_EMPTY or CANNOT_FLUSH.
* @param failureReason Reason why we couldn't flush.
*/
FlushResult(Result result, String failureReason) {
this(result, -1, failureReason);
FlushResult(Result result, String failureReason, boolean wroteFlushMarker) {
this(result, -1, failureReason, wroteFlushMarker);
assert result == Result.CANNOT_FLUSH_MEMSTORE_EMPTY || result == Result.CANNOT_FLUSH;
}
@ -510,10 +511,12 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver { //
* @param flushSequenceId Generated sequence id if the memstores were flushed else -1.
* @param failureReason Reason why we couldn't flush, or null.
*/
FlushResult(Result result, long flushSequenceId, String failureReason) {
FlushResult(Result result, long flushSequenceId, String failureReason,
boolean wroteFlushMarker) {
this.result = result;
this.flushSequenceId = flushSequenceId;
this.failureReason = failureReason;
this.wroteFlushWalMarker = wroteFlushMarker;
}
/**
@ -1787,7 +1790,7 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver { //
* @throws IOException
*/
public FlushResult flushcache() throws IOException {
return flushcache(true);
return flushcache(true, false);
}
/**
@ -1811,11 +1814,38 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver { //
* because a Snapshot was not properly persisted.
*/
public FlushResult flushcache(boolean forceFlushAllStores) throws IOException {
return flushcache(forceFlushAllStores, false);
}
/**
* Flush the cache.
*
* When this method is called the cache will be flushed unless:
* <ol>
* <li>the cache is empty</li>
* <li>the region is closed.</li>
* <li>a flush is already in progress</li>
* <li>writes are disabled</li>
* </ol>
*
* <p>This method may block for some time, so it should not be called from a
* time-sensitive thread.
* @param forceFlushAllStores whether we want to flush all stores
* @param writeFlushRequestWalMarker whether to write the flush request marker to WAL
* @return whether the flush is success and whether the region needs compacting
*
* @throws IOException general io exceptions
* @throws DroppedSnapshotException Thrown when replay of wal is required
* because a Snapshot was not properly persisted.
*/
public FlushResult flushcache(boolean forceFlushAllStores, boolean writeFlushRequestWalMarker)
throws IOException {
// fail-fast instead of waiting on the lock
if (this.closing.get()) {
String msg = "Skipping flush on " + this + " because closing";
LOG.debug(msg);
return new FlushResult(FlushResult.Result.CANNOT_FLUSH, msg);
return new FlushResult(FlushResult.Result.CANNOT_FLUSH, msg, false);
}
MonitoredTask status = TaskMonitor.get().createStatus("Flushing " + this);
status.setStatus("Acquiring readlock on region");
@ -1826,7 +1856,7 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver { //
String msg = "Skipping flush on " + this + " because closed";
LOG.debug(msg);
status.abort(msg);
return new FlushResult(FlushResult.Result.CANNOT_FLUSH, msg);
return new FlushResult(FlushResult.Result.CANNOT_FLUSH, msg, false);
}
if (coprocessorHost != null) {
status.setStatus("Running coprocessor pre-flush hooks");
@ -1851,14 +1881,15 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver { //
+ (writestate.flushing ? "already flushing"
: "writes not enabled");
status.abort(msg);
return new FlushResult(FlushResult.Result.CANNOT_FLUSH, msg);
return new FlushResult(FlushResult.Result.CANNOT_FLUSH, msg, false);
}
}
try {
Collection<Store> specificStoresToFlush =
forceFlushAllStores ? stores.values() : flushPolicy.selectStoresToFlush();
FlushResult fs = internalFlushcache(specificStoresToFlush, status);
FlushResult fs = internalFlushcache(specificStoresToFlush,
status, writeFlushRequestWalMarker);
if (coprocessorHost != null) {
status.setStatus("Running post-flush coprocessor hooks");
@ -1955,7 +1986,7 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver { //
*/
private FlushResult internalFlushcache(MonitoredTask status)
throws IOException {
return internalFlushcache(stores.values(), status);
return internalFlushcache(stores.values(), status, false);
}
/**
@ -1964,9 +1995,9 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver { //
* @see #internalFlushcache(WAL, long, Collection, MonitoredTask)
*/
private FlushResult internalFlushcache(final Collection<Store> storesToFlush,
MonitoredTask status) throws IOException {
MonitoredTask status, boolean writeFlushWalMarker) throws IOException {
return internalFlushcache(this.wal, HConstants.NO_SEQNUM, storesToFlush,
status);
status, writeFlushWalMarker);
}
/**
@ -1998,9 +2029,10 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver { //
* properly persisted.
*/
protected FlushResult internalFlushcache(final WAL wal, final long myseqid,
final Collection<Store> storesToFlush, MonitoredTask status) throws IOException {
final Collection<Store> storesToFlush, MonitoredTask status, boolean writeFlushWalMarker)
throws IOException {
PrepareFlushResult result
= internalPrepareFlushCache(wal, myseqid, storesToFlush, status, false);
= internalPrepareFlushCache(wal, myseqid, storesToFlush, status, writeFlushWalMarker);
if (result.result == null) {
return internalFlushCacheAndCommit(wal, status, result, storesToFlush);
} else {
@ -2010,7 +2042,7 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver { //
protected PrepareFlushResult internalPrepareFlushCache(
final WAL wal, final long myseqid, final Collection<Store> storesToFlush,
MonitoredTask status, boolean isReplay)
MonitoredTask status, boolean writeFlushWalMarker)
throws IOException {
if (this.rsServices != null && this.rsServices.isAborted()) {
@ -2036,14 +2068,16 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver { //
w = mvcc.beginMemstoreInsert();
long flushSeqId = getNextSequenceId(wal);
FlushResult flushResult = new FlushResult(
FlushResult.Result.CANNOT_FLUSH_MEMSTORE_EMPTY, flushSeqId, "Nothing to flush");
FlushResult.Result.CANNOT_FLUSH_MEMSTORE_EMPTY, flushSeqId, "Nothing to flush",
writeFlushRequestMarkerToWAL(wal, writeFlushWalMarker));
w.setWriteNumber(flushSeqId);
mvcc.waitForPreviousTransactionsComplete(w);
w = null;
return new PrepareFlushResult(flushResult, myseqid);
} else {
return new PrepareFlushResult(
new FlushResult(FlushResult.Result.CANNOT_FLUSH_MEMSTORE_EMPTY, "Nothing to flush"),
new FlushResult(FlushResult.Result.CANNOT_FLUSH_MEMSTORE_EMPTY, "Nothing to flush",
false),
myseqid);
}
}
@ -2110,7 +2144,8 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver { //
String msg = "Flush will not be started for ["
+ this.getRegionInfo().getEncodedName() + "] - because the WAL is closing.";
status.setStatus(msg);
return new PrepareFlushResult(new FlushResult(FlushResult.Result.CANNOT_FLUSH, msg),
return new PrepareFlushResult(
new FlushResult(FlushResult.Result.CANNOT_FLUSH, msg, false),
myseqid);
}
flushOpSeqId = getNextSequenceId(wal);
@ -2198,6 +2233,28 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver { //
flushedSeqId, totalFlushableSizeOfFlushableStores);
}
/**
* Writes a marker to WAL indicating a flush is requested but cannot be complete due to various
* reasons. Ignores exceptions from WAL. Returns whether the write succeeded.
* @param wal
* @return
*/
private boolean writeFlushRequestMarkerToWAL(WAL wal, boolean writeFlushWalMarker) {
if (writeFlushWalMarker && wal != null && !writestate.readOnly) {
FlushDescriptor desc = ProtobufUtil.toFlushDescriptor(FlushAction.CANNOT_FLUSH,
getRegionInfo(), -1, new TreeMap<byte[], List<Path>>());
try {
WALUtil.writeFlushMarker(wal, this.htableDescriptor, getRegionInfo(),
desc, sequenceId, true);
return true;
} catch (IOException e) {
LOG.warn(getRegionInfo().getEncodedName() + " : "
+ "Received exception while trying to write the flush request to wal", e);
}
}
return false;
}
protected FlushResult internalFlushCacheAndCommit(
final WAL wal, MonitoredTask status, final PrepareFlushResult prepareResult,
final Collection<Store> storesToFlush)
@ -2267,8 +2324,9 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver { //
WALUtil.writeFlushMarker(wal, this.htableDescriptor, getRegionInfo(),
desc, sequenceId, false);
} catch (Throwable ex) {
LOG.warn("Received unexpected exception trying to write ABORT_FLUSH marker to WAL:" +
StringUtils.stringifyException(ex));
LOG.warn(getRegionInfo().getEncodedName() + " : "
+ "Received unexpected exception trying to write ABORT_FLUSH marker to WAL:"
+ StringUtils.stringifyException(ex));
// ignore this since we will be aborting the RS with DSE.
}
wal.abortCacheFlush(this.getRegionInfo().getEncodedNameAsBytes());
@ -3546,7 +3604,8 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver { //
protected void checkReadsEnabled() throws IOException {
if (!this.writestate.readsEnabled) {
throw new IOException("The region's reads are disabled. Cannot serve the request");
throw new IOException(getRegionInfo().getEncodedName()
+ ": The region's reads are disabled. Cannot serve the request");
}
}
@ -3835,7 +3894,7 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver { //
}
if (seqid > minSeqIdForTheRegion) {
// Then we added some edits to memory. Flush and cleanup split edit files.
internalFlushcache(null, seqid, stores.values(), status);
internalFlushcache(null, seqid, stores.values(), status, false);
}
// Now delete the content of recovered edits. We're done w/ them.
if (files.size() > 0 && this.conf.getBoolean("hbase.region.archive.recovered.edits", false)) {
@ -3937,7 +3996,8 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver { //
if (currentEditSeqId > key.getLogSeqNum()) {
// when this condition is true, it means we have a serious defect because we need to
// maintain increasing SeqId for WAL edits per region
LOG.error("Found decreasing SeqId. PreId=" + currentEditSeqId + " key=" + key
LOG.error(getRegionInfo().getEncodedName() + " : "
+ "Found decreasing SeqId. PreId=" + currentEditSeqId + " key=" + key
+ "; edit=" + val);
} else {
currentEditSeqId = key.getLogSeqNum();
@ -4001,7 +4061,7 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver { //
editsCount++;
}
if (flush) {
internalFlushcache(null, currentEditSeqId, stores.values(), status);
internalFlushcache(null, currentEditSeqId, stores.values(), status, false);
}
if (coprocessorHost != null) {
@ -4060,18 +4120,25 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver { //
"Compaction marker from WAL ", compaction);
if (replaySeqId < lastReplayedOpenRegionSeqId) {
LOG.warn("Skipping replaying compaction event :" + TextFormat.shortDebugString(compaction)
+ " because its sequence id is smaller than this regions lastReplayedOpenRegionSeqId "
+ " of " + lastReplayedOpenRegionSeqId);
LOG.warn(getRegionInfo().getEncodedName() + " : "
+ "Skipping replaying compaction event :" + TextFormat.shortDebugString(compaction)
+ " because its sequence id is smaller than this regions lastReplayedOpenRegionSeqId "
+ " of " + lastReplayedOpenRegionSeqId);
return;
}
if (LOG.isDebugEnabled()) {
LOG.debug(getRegionInfo().getEncodedName() + " : "
+ "Replaying compaction marker " + TextFormat.shortDebugString(compaction));
}
startRegionOperation(Operation.REPLAY_EVENT);
try {
Store store = this.getStore(compaction.getFamilyName().toByteArray());
if (store == null) {
LOG.warn("Found Compaction WAL edit for deleted family:" +
Bytes.toString(compaction.getFamilyName().toByteArray()));
LOG.warn(getRegionInfo().getEncodedName() + " : "
+ "Found Compaction WAL edit for deleted family:"
+ Bytes.toString(compaction.getFamilyName().toByteArray()));
return;
}
store.replayCompactionMarker(compaction, pickCompactionFiles, removeFiles);
@ -4080,7 +4147,7 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver { //
}
}
void replayWALFlushMarker(FlushDescriptor flush) throws IOException {
void replayWALFlushMarker(FlushDescriptor flush, long replaySeqId) throws IOException {
checkTargetRegion(flush.getEncodedRegionName().toByteArray(),
"Flush marker from WAL ", flush);
@ -4089,7 +4156,8 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver { //
}
if (LOG.isDebugEnabled()) {
LOG.debug("Replaying flush marker " + TextFormat.shortDebugString(flush));
LOG.debug(getRegionInfo().getEncodedName() + " : "
+ "Replaying flush marker " + TextFormat.shortDebugString(flush));
}
startRegionOperation(Operation.REPLAY_EVENT); // use region close lock to guard against close
@ -4105,9 +4173,13 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver { //
case ABORT_FLUSH:
replayWALFlushAbortMarker(flush);
break;
case CANNOT_FLUSH:
replayWALFlushCannotFlushMarker(flush, replaySeqId);
break;
default:
LOG.warn("Received a flush event with unknown action, ignoring. "
+ TextFormat.shortDebugString(flush));
LOG.warn(getRegionInfo().getEncodedName() + " : " +
"Received a flush event with unknown action, ignoring. " +
TextFormat.shortDebugString(flush));
break;
}
} finally {
@ -4128,7 +4200,8 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver { //
byte[] family = storeFlush.getFamilyName().toByteArray();
Store store = getStore(family);
if (store == null) {
LOG.info("Received a flush start marker from primary, but the family is not found. Ignoring"
LOG.warn(getRegionInfo().getEncodedName() + " : "
+ "Received a flush start marker from primary, but the family is not found. Ignoring"
+ " StoreFlushDescriptor:" + TextFormat.shortDebugString(storeFlush));
continue;
}
@ -4142,9 +4215,10 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver { //
synchronized (writestate) {
try {
if (flush.getFlushSequenceNumber() < lastReplayedOpenRegionSeqId) {
LOG.warn("Skipping replaying flush event :" + TextFormat.shortDebugString(flush)
+ " because its sequence id is smaller than this regions lastReplayedOpenRegionSeqId "
+ " of " + lastReplayedOpenRegionSeqId);
LOG.warn(getRegionInfo().getEncodedName() + " : "
+ "Skipping replaying flush event :" + TextFormat.shortDebugString(flush)
+ " because its sequence id is smaller than this regions lastReplayedOpenRegionSeqId "
+ " of " + lastReplayedOpenRegionSeqId);
return null;
}
if (numMutationsWithoutWAL.get() > 0) {
@ -4158,7 +4232,7 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver { //
// invoke prepareFlushCache. Send null as wal since we do not want the flush events in wal
PrepareFlushResult prepareResult = internalPrepareFlushCache(null,
flushSeqId, storesToFlush, status, true);
flushSeqId, storesToFlush, status, false);
if (prepareResult.result == null) {
// save the PrepareFlushResult so that we can use it later from commit flush
this.writestate.flushing = true;
@ -4169,6 +4243,16 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver { //
+ " Prepared flush with seqId:" + flush.getFlushSequenceNumber());
}
} else {
// special case empty memstore. We will still save the flush result in this case, since
// our memstore ie empty, but the primary is still flushing
if (prepareResult.result.result == FlushResult.Result.CANNOT_FLUSH_MEMSTORE_EMPTY) {
this.writestate.flushing = true;
this.prepareFlushResult = prepareResult;
if (LOG.isDebugEnabled()) {
LOG.debug(getRegionInfo().getEncodedName() + " : "
+ " Prepared empty flush with seqId:" + flush.getFlushSequenceNumber());
}
}
status.abort("Flush prepare failed with " + prepareResult.result);
// nothing much to do. prepare flush failed because of some reason.
}
@ -4177,20 +4261,23 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver { //
// we already have an active snapshot.
if (flush.getFlushSequenceNumber() == this.prepareFlushResult.flushOpSeqId) {
// They define the same flush. Log and continue.
LOG.warn("Received a flush prepare marker with the same seqId: " +
LOG.warn(getRegionInfo().getEncodedName() + " : "
+ "Received a flush prepare marker with the same seqId: " +
+ flush.getFlushSequenceNumber() + " before clearing the previous one with seqId: "
+ prepareFlushResult.flushOpSeqId + ". Ignoring");
// ignore
} else if (flush.getFlushSequenceNumber() < this.prepareFlushResult.flushOpSeqId) {
// We received a flush with a smaller seqNum than what we have prepared. We can only
// ignore this prepare flush request.
LOG.warn("Received a flush prepare marker with a smaller seqId: " +
LOG.warn(getRegionInfo().getEncodedName() + " : "
+ "Received a flush prepare marker with a smaller seqId: " +
+ flush.getFlushSequenceNumber() + " before clearing the previous one with seqId: "
+ prepareFlushResult.flushOpSeqId + ". Ignoring");
// ignore
} else {
// We received a flush with a larger seqNum than what we have prepared
LOG.warn("Received a flush prepare marker with a larger seqId: " +
LOG.warn(getRegionInfo().getEncodedName() + " : "
+ "Received a flush prepare marker with a larger seqId: " +
+ flush.getFlushSequenceNumber() + " before clearing the previous one with seqId: "
+ prepareFlushResult.flushOpSeqId + ". Ignoring");
// We do not have multiple active snapshots in the memstore or a way to merge current
@ -4225,7 +4312,8 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver { //
synchronized (writestate) {
try {
if (flush.getFlushSequenceNumber() < lastReplayedOpenRegionSeqId) {
LOG.warn("Skipping replaying flush event :" + TextFormat.shortDebugString(flush)
LOG.warn(getRegionInfo().getEncodedName() + " : "
+ "Skipping replaying flush event :" + TextFormat.shortDebugString(flush)
+ " because its sequence id is smaller than this regions lastReplayedOpenRegionSeqId "
+ " of " + lastReplayedOpenRegionSeqId);
return;
@ -4253,7 +4341,8 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver { //
// we received a flush commit with a smaller seqId than what we have prepared
// we will pick the flush file up from this commit (if we have not seen it), but we
// will not drop the memstore
LOG.warn("Received a flush commit marker with smaller seqId: "
LOG.warn(getRegionInfo().getEncodedName() + " : "
+ "Received a flush commit marker with smaller seqId: "
+ flush.getFlushSequenceNumber() + " than what we have prepared with seqId: "
+ prepareFlushResult.flushOpSeqId + ". Picking up new file, but not dropping"
+" prepared memstore snapshot");
@ -4267,7 +4356,8 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver { //
// we will pick the flush file for this. We will also obtain the updates lock and
// look for contents of the memstore to see whether we have edits after this seqId.
// If not, we will drop all the memstore edits and the snapshot as well.
LOG.warn("Received a flush commit marker with larger seqId: "
LOG.warn(getRegionInfo().getEncodedName() + " : "
+ "Received a flush commit marker with larger seqId: "
+ flush.getFlushSequenceNumber() + " than what we have prepared with seqId: " +
prepareFlushResult.flushOpSeqId + ". Picking up new file and dropping prepared"
+" memstore snapshot");
@ -4284,6 +4374,12 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver { //
this.prepareFlushResult = null;
writestate.flushing = false;
}
// If we were waiting for observing a flush or region opening event for not showing
// partial data after a secondary region crash, we can allow reads now. We can only make
// sure that we are not showing partial data (for example skipping some previous edits)
// until we observe a full flush start and flush commit. So if we were not able to find
// a previous flush we will not enable reads now.
this.setReadsEnabled(true);
} else {
LOG.warn(getRegionInfo().getEncodedName() + " : "
+ "Received a flush commit marker with seqId:" + flush.getFlushSequenceNumber()
@ -4337,14 +4433,15 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver { //
byte[] family = storeFlush.getFamilyName().toByteArray();
Store store = getStore(family);
if (store == null) {
LOG.warn("Received a flush commit marker from primary, but the family is not found." +
"Ignoring StoreFlushDescriptor:" + storeFlush);
LOG.warn(getRegionInfo().getEncodedName() + " : "
+ "Received a flush commit marker from primary, but the family is not found."
+ "Ignoring StoreFlushDescriptor:" + storeFlush);
continue;
}
List<String> flushFiles = storeFlush.getFlushOutputList();
StoreFlushContext ctx = null;
long startTime = EnvironmentEdgeManager.currentTime();
if (prepareFlushResult == null) {
if (prepareFlushResult == null || prepareFlushResult.storeFlushCtxs == null) {
ctx = store.createFlushContext(flush.getFlushSequenceNumber());
} else {
ctx = prepareFlushResult.storeFlushCtxs.get(family);
@ -4352,7 +4449,8 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver { //
}
if (ctx == null) {
LOG.warn("Unexpected: flush commit marker received from store "
LOG.warn(getRegionInfo().getEncodedName() + " : "
+ "Unexpected: flush commit marker received from store "
+ Bytes.toString(family) + " but no associated flush context. Ignoring");
continue;
}
@ -4376,7 +4474,8 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver { //
long currentSeqId = getSequenceId().get();
if (seqId >= currentSeqId) {
// then we can drop the memstore contents since everything is below this seqId
LOG.info("Dropping memstore contents as well since replayed flush seqId: "
LOG.info(getRegionInfo().getEncodedName() + " : "
+ "Dropping memstore contents as well since replayed flush seqId: "
+ seqId + " is greater than current seqId:" + currentSeqId);
// Prepare flush (take a snapshot) and then abort (drop the snapshot)
@ -4388,7 +4487,8 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver { //
dropStoreMemstoreContentsForSeqId(store, currentSeqId);
}
} else {
LOG.info("Not dropping memstore contents since replayed flush seqId: "
LOG.info(getRegionInfo().getEncodedName() + " : "
+ "Not dropping memstore contents since replayed flush seqId: "
+ seqId + " is smaller than current seqId:" + currentSeqId);
}
} finally {
@ -4409,6 +4509,25 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver { //
// that will drop the snapshot
}
private void replayWALFlushCannotFlushMarker(FlushDescriptor flush, long replaySeqId) {
synchronized (writestate) {
if (this.lastReplayedOpenRegionSeqId > replaySeqId) {
LOG.warn(getRegionInfo().getEncodedName() + " : "
+ "Skipping replaying flush event :" + TextFormat.shortDebugString(flush)
+ " because its sequence id " + replaySeqId + " is smaller than this regions "
+ "lastReplayedOpenRegionSeqId of " + lastReplayedOpenRegionSeqId);
return;
}
// If we were waiting for observing a flush or region opening event for not showing partial
// data after a secondary region crash, we can allow reads now. This event means that the
// primary was not able to flush because memstore is empty when we requested flush. By the
// time we observe this, we are guaranteed to have up to date seqId with our previous
// assignment.
this.setReadsEnabled(true);
}
}
@VisibleForTesting
PrepareFlushResult getPrepareFlushResult() {
return prepareFlushResult;
@ -4429,13 +4548,15 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver { //
return;
}
if (regionEvent.getEventType() != EventType.REGION_OPEN) {
LOG.warn("Unknown region event received, ignoring :"
LOG.warn(getRegionInfo().getEncodedName() + " : "
+ "Unknown region event received, ignoring :"
+ TextFormat.shortDebugString(regionEvent));
return;
}
if (LOG.isDebugEnabled()) {
LOG.debug("Replaying region open event marker " + TextFormat.shortDebugString(regionEvent));
LOG.debug(getRegionInfo().getEncodedName() + " : "
+ "Replaying region open event marker " + TextFormat.shortDebugString(regionEvent));
}
// we will use writestate as a coarse-grain lock for all the replay events
@ -4446,10 +4567,11 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver { //
// region open event's seqid. Since this is the first event that the region puts (after
// possibly flushing recovered.edits), after seeing this event, we can ignore every edit
// smaller than this seqId
if (this.lastReplayedOpenRegionSeqId < regionEvent.getLogSequenceNumber()) {
if (this.lastReplayedOpenRegionSeqId <= regionEvent.getLogSequenceNumber()) {
this.lastReplayedOpenRegionSeqId = regionEvent.getLogSequenceNumber();
} else {
LOG.warn("Skipping replaying region event :" + TextFormat.shortDebugString(regionEvent)
LOG.warn(getRegionInfo().getEncodedName() + " : "
+ "Skipping replaying region event :" + TextFormat.shortDebugString(regionEvent)
+ " because its sequence id is smaller than this regions lastReplayedOpenRegionSeqId "
+ " of " + lastReplayedOpenRegionSeqId);
return;
@ -4462,7 +4584,8 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver { //
byte[] family = storeDescriptor.getFamilyName().toByteArray();
Store store = getStore(family);
if (store == null) {
LOG.warn("Received a region open marker from primary, but the family is not found. "
LOG.warn(getRegionInfo().getEncodedName() + " : "
+ "Received a region open marker from primary, but the family is not found. "
+ "Ignoring. StoreDescriptor:" + storeDescriptor);
continue;
}
@ -4478,7 +4601,8 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver { //
if (writestate.flushing) {
// only drop memstore snapshots if they are smaller than last flush for the store
if (this.prepareFlushResult.flushOpSeqId <= regionEvent.getLogSequenceNumber()) {
StoreFlushContext ctx = this.prepareFlushResult.storeFlushCtxs.get(family);
StoreFlushContext ctx = this.prepareFlushResult.storeFlushCtxs == null ?
null : this.prepareFlushResult.storeFlushCtxs.get(family);
if (ctx != null) {
long snapshotSize = store.getFlushableSize();
ctx.abort();
@ -4524,6 +4648,10 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver { //
// either greater than flush seq number or they were already dropped via flush.
getMVCC().advanceMemstoreReadPointIfNeeded(this.maxFlushedSeqId);
// If we were waiting for observing a flush or region opening event for not showing partial
// data after a secondary region crash, we can allow reads now.
this.setReadsEnabled(true);
// C. Finally notify anyone waiting on memstore to clear:
// e.g. checkResources().
synchronized (this) {
@ -4865,7 +4993,7 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver { //
// guaranteed to be one beyond the file made when we flushed (or if nothing to flush, it is
// a sequence id that we can be sure is beyond the last hfile written).
if (assignSeqId) {
FlushResult fs = this.flushcache(true);
FlushResult fs = this.flushcache();
if (fs.isFlushSucceeded()) {
seqId = fs.flushSequenceId;
} else if (fs.result == FlushResult.Result.CANNOT_FLUSH_MEMSTORE_EMPTY) {
@ -5832,8 +5960,8 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver { //
FileSystem fs = a.getRegionFileSystem().getFileSystem();
// Make sure each region's cache is empty
a.flushcache(true);
b.flushcache(true);
a.flushcache();
b.flushcache();
// Compact each region so we only have one store file per family
a.compactStores(true);

View File

@ -80,6 +80,7 @@ import org.apache.hadoop.hbase.classification.InterfaceAudience;
import org.apache.hadoop.hbase.client.ClusterConnection;
import org.apache.hadoop.hbase.client.ConnectionFactory;
import org.apache.hadoop.hbase.client.ConnectionUtils;
import org.apache.hadoop.hbase.client.RpcRetryingCallerFactory;
import org.apache.hadoop.hbase.conf.ConfigurationManager;
import org.apache.hadoop.hbase.coordination.BaseCoordinatedStateManager;
import org.apache.hadoop.hbase.coordination.SplitLogWorkerCoordination;
@ -94,6 +95,7 @@ import org.apache.hadoop.hbase.http.InfoServer;
import org.apache.hadoop.hbase.io.hfile.CacheConfig;
import org.apache.hadoop.hbase.ipc.RpcClient;
import org.apache.hadoop.hbase.ipc.RpcClientFactory;
import org.apache.hadoop.hbase.ipc.RpcControllerFactory;
import org.apache.hadoop.hbase.ipc.RpcServerInterface;
import org.apache.hadoop.hbase.ipc.ServerNotRunningYetException;
import org.apache.hadoop.hbase.ipc.ServerRpcController;
@ -128,6 +130,7 @@ import org.apache.hadoop.hbase.quotas.RegionServerQuotaManager;
import org.apache.hadoop.hbase.regionserver.compactions.CompactionProgress;
import org.apache.hadoop.hbase.regionserver.handler.CloseMetaHandler;
import org.apache.hadoop.hbase.regionserver.handler.CloseRegionHandler;
import org.apache.hadoop.hbase.regionserver.handler.RegionReplicaFlushHandler;
import org.apache.hadoop.hbase.regionserver.wal.MetricsWAL;
import org.apache.hadoop.hbase.regionserver.wal.WALActionsListener;
import org.apache.hadoop.hbase.replication.regionserver.ReplicationLoad;
@ -143,6 +146,7 @@ import org.apache.hadoop.hbase.util.FSUtils;
import org.apache.hadoop.hbase.util.HasThread;
import org.apache.hadoop.hbase.util.JSONBean;
import org.apache.hadoop.hbase.util.JvmPauseMonitor;
import org.apache.hadoop.hbase.util.ServerRegionReplicaUtil;
import org.apache.hadoop.hbase.util.Sleeper;
import org.apache.hadoop.hbase.util.Threads;
import org.apache.hadoop.hbase.util.VersionInfo;
@ -313,6 +317,9 @@ public class HRegionServer extends HasThread implements
// RPC client. Used to make the stub above that does region server status checking.
RpcClient rpcClient;
private RpcRetryingCallerFactory rpcRetryingCallerFactory;
private RpcControllerFactory rpcControllerFactory;
private UncaughtExceptionHandler uncaughtExceptionHandler;
// Info server. Default access so can be used by unit tests. REGIONSERVER
@ -369,6 +376,7 @@ public class HRegionServer extends HasThread implements
protected final Sleeper sleeper;
private final int operationTimeout;
private final int shortOperationTimeout;
private final RegionServerAccounting regionServerAccounting;
@ -495,6 +503,10 @@ public class HRegionServer extends HasThread implements
"hbase.regionserver.numregionstoreport", 10);
this.operationTimeout = conf.getInt(
HConstants.HBASE_CLIENT_OPERATION_TIMEOUT,
HConstants.DEFAULT_HBASE_CLIENT_OPERATION_TIMEOUT);
this.shortOperationTimeout = conf.getInt(
HConstants.HBASE_RPC_SHORTOPERATION_TIMEOUT_KEY,
HConstants.DEFAULT_HBASE_RPC_SHORTOPERATION_TIMEOUT);
@ -506,6 +518,9 @@ public class HRegionServer extends HasThread implements
String hostName = rpcServices.isa.getHostName();
serverName = ServerName.valueOf(hostName, rpcServices.isa.getPort(), startcode);
rpcControllerFactory = RpcControllerFactory.instantiate(this.conf);
rpcRetryingCallerFactory = RpcRetryingCallerFactory.instantiate(this.conf);
// login the zookeeper client principal (if using security)
ZKUtil.loginClient(this.conf, "hbase.zookeeper.client.keytab.file",
"hbase.zookeeper.client.kerberos.principal", hostName);
@ -1639,6 +1654,12 @@ public class HRegionServer extends HasThread implements
this.service.startExecutorService(ExecutorType.RS_LOG_REPLAY_OPS, conf.getInt(
"hbase.regionserver.wal.max.splitters", SplitLogWorkerCoordination.DEFAULT_MAX_SPLITTERS));
if (ServerRegionReplicaUtil.isRegionReplicaWaitForPrimaryFlushEnabled(conf)) {
this.service.startExecutorService(ExecutorType.RS_REGION_REPLICA_FLUSH_OPS,
conf.getInt("hbase.regionserver.region.replica.flusher.threads",
conf.getInt("hbase.regionserver.executor.openregion.threads", 3)));
}
Threads.setDaemonThreadRunning(this.walRoller.getThread(), getName() + ".logRoller",
uncaughtExceptionHandler);
this.cacheFlusher.start(uncaughtExceptionHandler);
@ -1842,6 +1863,8 @@ public class HRegionServer extends HasThread implements
+ r.getRegionNameAsString());
}
triggerFlushInPrimaryRegion(r);
LOG.debug("Finished post open deploy task for " + r.getRegionNameAsString());
}
@ -1917,6 +1940,30 @@ public class HRegionServer extends HasThread implements
return false;
}
/**
* Trigger a flush in the primary region replica if this region is a secondary replica. Does not
* block this thread. See RegionReplicaFlushHandler for details.
*/
void triggerFlushInPrimaryRegion(final HRegion region) {
if (ServerRegionReplicaUtil.isDefaultReplica(region.getRegionInfo())) {
return;
}
if (!ServerRegionReplicaUtil.isRegionReplicaReplicationEnabled(getConfiguration()) ||
!ServerRegionReplicaUtil.isRegionReplicaWaitForPrimaryFlushEnabled(
getConfiguration())) {
region.setReadsEnabled(true);
return;
}
region.setReadsEnabled(false); // disable reads before marking the region as opened.
// RegionReplicaFlushHandler might reset this.
// submit it to be handled by one of the handlers so that we do not block OpenRegionHandler
this.service.submit(
new RegionReplicaFlushHandler(this, clusterConnection,
rpcRetryingCallerFactory, rpcControllerFactory, operationTimeout, region));
}
@Override
public RpcServerInterface getRpcServer() {
return rpcServices.rpcServer;
@ -2106,7 +2153,8 @@ public class HRegionServer extends HasThread implements
}
try {
BlockingRpcChannel channel =
this.rpcClient.createBlockingRpcChannel(sn, userProvider.getCurrent(), operationTimeout);
this.rpcClient.createBlockingRpcChannel(sn, userProvider.getCurrent(),
shortOperationTimeout);
intf = RegionServerStatusService.newBlockingStub(channel);
break;
} catch (IOException e) {

View File

@ -2241,6 +2241,8 @@ public class HStore implements Store {
StoreFileInfo storeFileInfo = fs.getStoreFileInfo(getColumnFamilyName(), file);
StoreFile storeFile = createStoreFileAndReader(storeFileInfo);
storeFiles.add(storeFile);
HStore.this.storeSize += storeFile.getReader().length();
HStore.this.totalUncompressedBytes += storeFile.getReader().getTotalUncompressedBytes();
if (LOG.isInfoEnabled()) {
LOG.info("Region: " + HStore.this.getRegionInfo().getEncodedName() +
" added " + storeFile + ", entries=" + storeFile.getReader().getEntries() +
@ -2249,7 +2251,10 @@ public class HStore implements Store {
}
}
long snapshotId = dropMemstoreSnapshot ? snapshot.getId() : -1; // -1 means do not drop
long snapshotId = -1; // -1 means do not drop
if (dropMemstoreSnapshot && snapshot != null) {
snapshotId = snapshot.getId();
}
HStore.this.updateStorefiles(storeFiles, snapshotId);
}

View File

@ -724,7 +724,7 @@ public class RSRpcServices implements HBaseRPCErrorHandler,
}
FlushDescriptor flushDesc = WALEdit.getFlushDescriptor(metaCell);
if (flushDesc != null && !isDefaultReplica) {
region.replayWALFlushMarker(flushDesc);
region.replayWALFlushMarker(flushDesc, replaySeqId);
continue;
}
RegionEventDescriptor regionEvent = WALEdit.getRegionEventDescriptor(metaCell);
@ -1091,18 +1091,21 @@ public class RSRpcServices implements HBaseRPCErrorHandler,
}
FlushRegionResponse.Builder builder = FlushRegionResponse.newBuilder();
if (shouldFlush) {
boolean writeFlushWalMarker = request.hasWriteFlushWalMarker() ?
request.getWriteFlushWalMarker() : false;
long startTime = EnvironmentEdgeManager.currentTime();
HRegion.FlushResult flushResult = region.flushcache();
HRegion.FlushResult flushResult = region.flushcache(true, writeFlushWalMarker);
if (flushResult.isFlushSucceeded()) {
long endTime = EnvironmentEdgeManager.currentTime();
regionServer.metricsRegionServer.updateFlushTime(endTime - startTime);
}
boolean result = flushResult.isCompactionNeeded();
if (result) {
boolean compactionNeeded = flushResult.isCompactionNeeded();
if (compactionNeeded) {
regionServer.compactSplitThread.requestSystemCompaction(region,
"Compaction through user triggered flush");
}
builder.setFlushed(result);
builder.setFlushed(flushResult.isFlushSucceeded());
builder.setWroteFlushWalMarker(flushResult.wroteFlushWalMarker);
}
builder.setLastFlushTime( region.getEarliestFlushTimeForAllStores());
return builder.build();
@ -1460,7 +1463,7 @@ public class RSRpcServices implements HBaseRPCErrorHandler,
"regions. First region:" + regionName.toStringUtf8() + " , other region:"
+ entry.getKey().getEncodedRegionName());
}
if (regionServer.nonceManager != null) {
if (regionServer.nonceManager != null && isPrimary) {
long nonceGroup = entry.getKey().hasNonceGroup()
? entry.getKey().getNonceGroup() : HConstants.NO_NONCE;
long nonce = entry.getKey().hasNonce() ? entry.getKey().getNonce() : HConstants.NO_NONCE;

View File

@ -0,0 +1,187 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hbase.regionserver.handler;
import java.io.IOException;
import java.io.InterruptedIOException;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.HConstants;
import org.apache.hadoop.hbase.Server;
import org.apache.hadoop.hbase.TableNotFoundException;
import org.apache.hadoop.hbase.client.ClusterConnection;
import org.apache.hadoop.hbase.client.FlushRegionCallable;
import org.apache.hadoop.hbase.client.RegionReplicaUtil;
import org.apache.hadoop.hbase.client.RpcRetryingCallerFactory;
import org.apache.hadoop.hbase.executor.EventHandler;
import org.apache.hadoop.hbase.executor.EventType;
import org.apache.hadoop.hbase.ipc.RpcControllerFactory;
import org.apache.hadoop.hbase.protobuf.generated.AdminProtos.FlushRegionResponse;
import org.apache.hadoop.hbase.regionserver.HRegion;
import org.apache.hadoop.hbase.util.RetryCounter;
import org.apache.hadoop.hbase.util.RetryCounterFactory;
import org.apache.hadoop.hbase.util.ServerRegionReplicaUtil;
/**
* HBASE-11580: With the async wal approach (HBASE-11568), the edits are not persisted to wal in
* secondary region replicas. This means that a secondary region replica can serve some edits from
* it's memstore that that is still not flushed from primary. We do not want to allow secondary
* region's seqId to go back in time, when this secondary region is opened elsewhere after a
* crash or region move. We will trigger a flush cache in the primary region replica and wait
* for observing a complete flush cycle before marking the region readsEnabled. This handler does
* the flushing of the primary region replica and ensures that regular region opening is not
* blocked while the secondary replica is blocked on flush.
*/
@InterfaceAudience.Private
public class RegionReplicaFlushHandler extends EventHandler {
private static final Log LOG = LogFactory.getLog(RegionReplicaFlushHandler.class);
private final ClusterConnection connection;
private final RpcRetryingCallerFactory rpcRetryingCallerFactory;
private final RpcControllerFactory rpcControllerFactory;
private final int operationTimeout;
private final HRegion region;
public RegionReplicaFlushHandler(Server server, ClusterConnection connection,
RpcRetryingCallerFactory rpcRetryingCallerFactory, RpcControllerFactory rpcControllerFactory,
int operationTimeout, HRegion region) {
super(server, EventType.RS_REGION_REPLICA_FLUSH);
this.connection = connection;
this.rpcRetryingCallerFactory = rpcRetryingCallerFactory;
this.rpcControllerFactory = rpcControllerFactory;
this.operationTimeout = operationTimeout;
this.region = region;
}
@Override
public void process() throws IOException {
triggerFlushInPrimaryRegion(region);
}
@Override
protected void handleException(Throwable t) {
super.handleException(t);
if (t instanceof InterruptedIOException || t instanceof InterruptedException) {
// ignore
} else if (t instanceof RuntimeException) {
server.abort("ServerAborting because a runtime exception was thrown", t);
} else {
// something fishy since we cannot flush the primary region until all retries (retries from
// rpc times 35 trigger). We cannot close the region since there is no such mechanism to
// close a region without master triggering it. We just abort the server for now.
server.abort("ServerAborting because an exception was thrown", t);
}
}
private int getRetriesCount(Configuration conf) {
int numRetries = conf.getInt(HConstants.HBASE_CLIENT_RETRIES_NUMBER,
HConstants.DEFAULT_HBASE_CLIENT_RETRIES_NUMBER);
if (numRetries > 10) {
int mult = conf.getInt("hbase.client.serverside.retries.multiplier", 10);
numRetries = numRetries / mult; // reset if HRS has multiplied this already
}
return numRetries;
}
void triggerFlushInPrimaryRegion(final HRegion region) throws IOException, RuntimeException {
long pause = connection.getConfiguration().getLong(HConstants.HBASE_CLIENT_PAUSE,
HConstants.DEFAULT_HBASE_CLIENT_PAUSE);
int maxAttempts = getRetriesCount(connection.getConfiguration());
RetryCounter counter = new RetryCounterFactory(maxAttempts, (int)pause).create();
if (LOG.isDebugEnabled()) {
LOG.debug("Attempting to do an RPC to the primary region replica " + ServerRegionReplicaUtil
.getRegionInfoForDefaultReplica(region.getRegionInfo()).getEncodedName() + " of region "
+ region.getRegionInfo().getEncodedName() + " to trigger a flush");
}
while (!region.isClosing() && !region.isClosed()
&& !server.isAborted() && !server.isStopped()) {
FlushRegionCallable flushCallable = new FlushRegionCallable(
connection, rpcControllerFactory,
RegionReplicaUtil.getRegionInfoForDefaultReplica(region.getRegionInfo()), true);
// TODO: flushRegion() is a blocking call waiting for the flush to complete. Ideally we
// do not have to wait for the whole flush here, just initiate it.
FlushRegionResponse response = null;
try {
response = rpcRetryingCallerFactory.<FlushRegionResponse>newCaller()
.callWithRetries(flushCallable, this.operationTimeout);
} catch (IOException ex) {
if (ex instanceof TableNotFoundException
|| connection.isTableDisabled(region.getRegionInfo().getTable())) {
return;
}
throw ex;
}
if (response.getFlushed()) {
// then we have to wait for seeing the flush entry. All reads will be rejected until we see
// a complete flush cycle or replay a region open event
if (LOG.isDebugEnabled()) {
LOG.debug("Successfully triggered a flush of primary region replica "
+ ServerRegionReplicaUtil
.getRegionInfoForDefaultReplica(region.getRegionInfo()).getEncodedName()
+ " of region " + region.getRegionInfo().getEncodedName()
+ " Now waiting and blocking reads until observing a full flush cycle");
}
break;
} else {
if (response.hasWroteFlushWalMarker()) {
if(response.getWroteFlushWalMarker()) {
if (LOG.isDebugEnabled()) {
LOG.debug("Successfully triggered an empty flush marker(memstore empty) of primary "
+ "region replica " + ServerRegionReplicaUtil
.getRegionInfoForDefaultReplica(region.getRegionInfo()).getEncodedName()
+ " of region " + region.getRegionInfo().getEncodedName() + " Now waiting and "
+ "blocking reads until observing a flush marker");
}
break;
} else {
// somehow we were not able to get the primary to write the flush request. It may be
// closing or already flushing. Retry flush again after some sleep.
if (!counter.shouldRetry()) {
throw new IOException("Cannot cause primary to flush or drop a wal marker after " +
"retries. Failing opening of this region replica "
+ region.getRegionInfo().getEncodedName());
}
}
} else {
// nothing to do. Are we dealing with an old server?
LOG.warn("Was not able to trigger a flush from primary region due to old server version? "
+ "Continuing to open the secondary region replica: "
+ region.getRegionInfo().getEncodedName());
region.setReadsEnabled(true);
break;
}
}
try {
counter.sleepUntilNextRetry();
} catch (InterruptedException e) {
throw new InterruptedIOException(e.getMessage());
}
}
}
}

View File

@ -38,7 +38,6 @@ import org.apache.hadoop.hbase.classification.InterfaceAudience;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hbase.CellScanner;
import org.apache.hadoop.hbase.DoNotRetryIOException;
import org.apache.hadoop.hbase.HBaseConfiguration;
import org.apache.hadoop.hbase.HBaseIOException;
import org.apache.hadoop.hbase.HConstants;
@ -51,7 +50,6 @@ import org.apache.hadoop.hbase.client.RegionAdminServiceCallable;
import org.apache.hadoop.hbase.client.ClusterConnection;
import org.apache.hadoop.hbase.client.HConnectionManager;
import org.apache.hadoop.hbase.client.RegionReplicaUtil;
import org.apache.hadoop.hbase.client.RetriesExhaustedException;
import org.apache.hadoop.hbase.client.RetryingCallable;
import org.apache.hadoop.hbase.client.RpcRetryingCallerFactory;
import org.apache.hadoop.hbase.ipc.PayloadCarryingRpcController;
@ -60,7 +58,6 @@ import org.apache.hadoop.hbase.protobuf.ProtobufUtil;
import org.apache.hadoop.hbase.protobuf.ReplicationProtbufUtil;
import org.apache.hadoop.hbase.protobuf.generated.AdminProtos;
import org.apache.hadoop.hbase.protobuf.generated.AdminProtos.ReplicateWALEntryResponse;
import org.apache.hadoop.hbase.regionserver.wal.WALCellCodec;
import org.apache.hadoop.hbase.wal.WAL.Entry;
import org.apache.hadoop.hbase.wal.WALSplitter.EntryBuffers;
import org.apache.hadoop.hbase.wal.WALSplitter.OutputSink;
@ -88,6 +85,10 @@ public class RegionReplicaReplicationEndpoint extends HBaseReplicationEndpoint {
private static final Log LOG = LogFactory.getLog(RegionReplicaReplicationEndpoint.class);
// Can be configured differently than hbase.client.retries.number
private static String CLIENT_RETRIES_NUMBER
= "hbase.region.replica.replication.client.retries.number";
private Configuration conf;
private ClusterConnection connection;
@ -109,6 +110,20 @@ public class RegionReplicaReplicationEndpoint extends HBaseReplicationEndpoint {
this.conf = HBaseConfiguration.create(context.getConfiguration());
// HRS multiplies client retries by 10 globally for meta operations, but we do not want this.
// We are resetting it here because we want default number of retries (35) rather than 10 times
// that which makes very long retries for disabled tables etc.
int defaultNumRetries = conf.getInt(HConstants.HBASE_CLIENT_RETRIES_NUMBER,
HConstants.DEFAULT_HBASE_CLIENT_RETRIES_NUMBER);
if (defaultNumRetries > 10) {
int mult = conf.getInt("hbase.client.serverside.retries.multiplier", 10);
defaultNumRetries = defaultNumRetries / mult; // reset if HRS has multiplied this already
}
conf.setInt("hbase.client.serverside.retries.multiplier", 1);
int numRetries = conf.getInt(CLIENT_RETRIES_NUMBER, defaultNumRetries);
conf.setInt(HConstants.HBASE_CLIENT_RETRIES_NUMBER, numRetries);
this.numWriterThreads = this.conf.getInt(
"hbase.region.replica.replication.writer.threads", 3);
controller = new PipelineController();
@ -358,7 +373,8 @@ public class RegionReplicaReplicationEndpoint extends HBaseReplicationEndpoint {
while (true) {
// get the replicas of the primary region
try {
locations = getRegionLocations(connection, tableName, row, useCache, 0);
locations = RegionReplicaReplayCallable
.getRegionLocations(connection, tableName, row, useCache, 0);
if (locations == null) {
throw new HBaseIOException("Cannot locate locations for "
@ -490,59 +506,21 @@ public class RegionReplicaReplicationEndpoint extends HBaseReplicationEndpoint {
*/
static class RegionReplicaReplayCallable
extends RegionAdminServiceCallable<ReplicateWALEntryResponse> {
// replicaId of the region replica that we want to replicate to
private final int replicaId;
private final List<Entry> entries;
private final byte[] initialEncodedRegionName;
private final AtomicLong skippedEntries;
private final RpcControllerFactory rpcControllerFactory;
private boolean skip;
public RegionReplicaReplayCallable(ClusterConnection connection,
RpcControllerFactory rpcControllerFactory, TableName tableName,
HRegionLocation location, HRegionInfo regionInfo, byte[] row,List<Entry> entries,
AtomicLong skippedEntries) {
super(connection, location, tableName, row);
this.replicaId = regionInfo.getReplicaId();
super(connection, rpcControllerFactory, location, tableName, row, regionInfo.getReplicaId());
this.entries = entries;
this.rpcControllerFactory = rpcControllerFactory;
this.skippedEntries = skippedEntries;
this.initialEncodedRegionName = regionInfo.getEncodedNameAsBytes();
}
@Override
public HRegionLocation getLocation(boolean useCache) throws IOException {
RegionLocations rl = getRegionLocations(connection, tableName, row, useCache, replicaId);
if (rl == null) {
throw new HBaseIOException(getExceptionMessage());
}
location = rl.getRegionLocation(replicaId);
if (location == null) {
throw new HBaseIOException(getExceptionMessage());
}
// check whether we should still replay this entry. If the regions are changed, or the
// entry is not coming form the primary region, filter it out because we do not need it.
// Regions can change because of (1) region split (2) region merge (3) table recreated
if (!Bytes.equals(location.getRegionInfo().getEncodedNameAsBytes(),
initialEncodedRegionName)) {
skip = true;
if (LOG.isTraceEnabled()) {
LOG.trace("Skipping " + entries.size() + " entries in table " + tableName
+ " because located region " + location.getRegionInfo().getEncodedName()
+ " is different than the original region "
+ Bytes.toStringBinary(initialEncodedRegionName) + " from WALEdit");
for (Entry entry : entries) {
LOG.trace("Skipping : " + entry);
}
}
return null;
}
return location;
}
@Override
public ReplicateWALEntryResponse call(int timeout) throws IOException {
return replayToServer(this.entries, timeout);
@ -550,55 +528,46 @@ public class RegionReplicaReplicationEndpoint extends HBaseReplicationEndpoint {
private ReplicateWALEntryResponse replayToServer(List<Entry> entries, int timeout)
throws IOException {
if (entries.isEmpty() || skip) {
// check whether we should still replay this entry. If the regions are changed, or the
// entry is not coming form the primary region, filter it out because we do not need it.
// Regions can change because of (1) region split (2) region merge (3) table recreated
boolean skip = false;
if (!Bytes.equals(location.getRegionInfo().getEncodedNameAsBytes(),
initialEncodedRegionName)) {
skip = true;
}
if (!entries.isEmpty() && !skip) {
Entry[] entriesArray = new Entry[entries.size()];
entriesArray = entries.toArray(entriesArray);
// set the region name for the target region replica
Pair<AdminProtos.ReplicateWALEntryRequest, CellScanner> p =
ReplicationProtbufUtil.buildReplicateWALEntryRequest(
entriesArray, location.getRegionInfo().getEncodedNameAsBytes());
try {
PayloadCarryingRpcController controller = rpcControllerFactory.newController(p.getSecond());
controller.setCallTimeout(timeout);
controller.setPriority(tableName);
return stub.replay(controller, p.getFirst());
} catch (ServiceException se) {
throw ProtobufUtil.getRemoteException(se);
}
}
if (skip) {
if (LOG.isTraceEnabled()) {
LOG.trace("Skipping " + entries.size() + " entries in table " + tableName
+ " because located region " + location.getRegionInfo().getEncodedName()
+ " is different than the original region "
+ Bytes.toStringBinary(initialEncodedRegionName) + " from WALEdit");
for (Entry entry : entries) {
LOG.trace("Skipping : " + entry);
}
}
skippedEntries.addAndGet(entries.size());
return ReplicateWALEntryResponse.newBuilder().build();
}
Entry[] entriesArray = new Entry[entries.size()];
entriesArray = entries.toArray(entriesArray);
// set the region name for the target region replica
Pair<AdminProtos.ReplicateWALEntryRequest, CellScanner> p =
ReplicationProtbufUtil.buildReplicateWALEntryRequest(
entriesArray, location.getRegionInfo().getEncodedNameAsBytes());
try {
PayloadCarryingRpcController controller = rpcControllerFactory.newController(p.getSecond());
controller.setCallTimeout(timeout);
controller.setPriority(tableName);
return stub.replay(controller, p.getFirst());
} catch (ServiceException se) {
throw ProtobufUtil.getRemoteException(se);
}
return ReplicateWALEntryResponse.newBuilder().build();
}
@Override
protected String getExceptionMessage() {
return super.getExceptionMessage() + " table=" + tableName
+ " ,replica=" + replicaId + ", row=" + Bytes.toStringBinary(row);
}
}
private static RegionLocations getRegionLocations(
ClusterConnection connection, TableName tableName, byte[] row,
boolean useCache, int replicaId)
throws RetriesExhaustedException, DoNotRetryIOException, InterruptedIOException {
RegionLocations rl;
try {
rl = connection.locateRegion(tableName, row, useCache, true, replicaId);
} catch (DoNotRetryIOException e) {
throw e;
} catch (RetriesExhaustedException e) {
throw e;
} catch (InterruptedIOException e) {
throw e;
} catch (IOException e) {
throw new RetriesExhaustedException("Can't get the location", e);
}
if (rl == null) {
throw new RetriesExhaustedException("Can't get the locations");
}
return rl;
}
}

View File

@ -55,6 +55,19 @@ public class ServerRegionReplicaUtil extends RegionReplicaUtil {
private static final boolean DEFAULT_REGION_REPLICA_REPLICATION = false;
private static final String REGION_REPLICA_REPLICATION_PEER = "region_replica_replication";
/**
* Whether or not the secondary region will wait for observing a flush / region open event
* from the primary region via async wal replication before enabling read requests. Since replayed
* edits from async wal replication from primary is not persisted in WAL, the memstore of the
* secondary region might be non-empty at the time of close or crash. For ensuring seqId's not
* "going back in time" in the secondary region replica, this should be enabled. However, in some
* cases the above semantics might be ok for some application classes.
* See HBASE-11580 for more context.
*/
public static final String REGION_REPLICA_WAIT_FOR_PRIMARY_FLUSH_CONF_KEY
= "hbase.region.replica.wait.for.primary.flush";
private static final boolean DEFAULT_REGION_REPLICA_WAIT_FOR_PRIMARY_FLUSH = true;
/**
* Returns the regionInfo object to use for interacting with the file system.
* @return An HRegionInfo object to interact with the filesystem
@ -122,7 +135,7 @@ public class ServerRegionReplicaUtil extends RegionReplicaUtil {
* @throws IOException
*/
public static void setupRegionReplicaReplication(Configuration conf) throws IOException {
if (!conf.getBoolean(REGION_REPLICA_REPLICATION_CONF_KEY, DEFAULT_REGION_REPLICA_REPLICATION)) {
if (!isRegionReplicaReplicationEnabled(conf)) {
return;
}
ReplicationAdmin repAdmin = new ReplicationAdmin(conf);
@ -140,6 +153,16 @@ public class ServerRegionReplicaUtil extends RegionReplicaUtil {
}
}
public static boolean isRegionReplicaReplicationEnabled(Configuration conf) {
return conf.getBoolean(REGION_REPLICA_REPLICATION_CONF_KEY,
DEFAULT_REGION_REPLICA_REPLICATION);
}
public static boolean isRegionReplicaWaitForPrimaryFlushEnabled(Configuration conf) {
return conf.getBoolean(REGION_REPLICA_WAIT_FOR_PRIMARY_FLUSH_CONF_KEY,
DEFAULT_REGION_REPLICA_WAIT_FOR_PRIMARY_FLUSH);
}
/**
* Return the peer id used for replicating to secondary region replicas
*/

View File

@ -63,6 +63,7 @@ import org.apache.hadoop.hbase.classification.InterfaceStability;
import org.apache.hadoop.hbase.client.Admin;
import org.apache.hadoop.hbase.client.Connection;
import org.apache.hadoop.hbase.client.ConnectionFactory;
import org.apache.hadoop.hbase.client.Consistency;
import org.apache.hadoop.hbase.client.Delete;
import org.apache.hadoop.hbase.client.Durability;
import org.apache.hadoop.hbase.client.Get;
@ -2170,6 +2171,25 @@ public class HBaseTestingUtility extends HBaseCommonTestingUtility {
}
}
public void verifyNumericRows(Table table, final byte[] f, int startRow, int endRow,
int replicaId)
throws IOException {
for (int i = startRow; i < endRow; i++) {
String failMsg = "Failed verification of row :" + i;
byte[] data = Bytes.toBytes(String.valueOf(i));
Get get = new Get(data);
get.setReplicaId(replicaId);
get.setConsistency(Consistency.TIMELINE);
Result result = table.get(get);
assertTrue(failMsg, result.containsColumn(f, null));
assertEquals(failMsg, result.getColumnCells(f, null).size(), 1);
Cell cell = result.getColumnLatestCell(f, null);
assertTrue(failMsg,
Bytes.equals(data, 0, data.length, cell.getValueArray(), cell.getValueOffset(),
cell.getValueLength()));
}
}
public void verifyNumericRows(HRegion region, final byte[] f, int startRow, int endRow)
throws IOException {
for (int i = startRow; i < endRow; i++) {

View File

@ -100,7 +100,7 @@ public class TestPrefixTree {
put = new Put(row4_bytes);
put.add(fam, qual2, Bytes.toBytes("c2-value-3"));
region.put(put);
region.flushcache(true);
region.flushcache();
String[] rows = new String[3];
rows[0] = row1;
rows[1] = row2;
@ -182,7 +182,7 @@ public class TestPrefixTree {
region.put(new Put(Bytes.toBytes("obj29")).add(fam, qual1, Bytes.toBytes("whatever")));
region.put(new Put(Bytes.toBytes("obj2")).add(fam, qual1, Bytes.toBytes("whatever")));
region.put(new Put(Bytes.toBytes("obj3")).add(fam, qual1, Bytes.toBytes("whatever")));
region.flushcache(true);
region.flushcache();
Scan scan = new Scan(Bytes.toBytes("obj29995"));
RegionScanner scanner = region.getScanner(scan);
List<Cell> cells = new ArrayList<Cell>();

View File

@ -48,6 +48,7 @@ import org.apache.hadoop.hbase.HTableDescriptor;
import org.apache.hadoop.hbase.ServerName;
import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.client.Durability;
import org.apache.hadoop.hbase.client.Get;
import org.apache.hadoop.hbase.client.Put;
import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.MutationProto.MutationType;
import org.apache.hadoop.hbase.protobuf.generated.WALProtos.CompactionDescriptor;
@ -312,6 +313,8 @@ public class TestHRegionReplayEvents {
long storeMemstoreSize = store.getMemStoreSize();
long regionMemstoreSize = secondaryRegion.getMemstoreSize().get();
long storeFlushableSize = store.getFlushableSize();
long storeSize = store.getSize();
long storeSizeUncompressed = store.getStoreSizeUncompressed();
if (flushDesc.getAction() == FlushAction.START_FLUSH) {
LOG.info("-- Replaying flush start in secondary");
PrepareFlushResult result = secondaryRegion.replayWALFlushStartMarker(flushDesc);
@ -339,6 +342,11 @@ public class TestHRegionReplayEvents {
// assert that the region memstore is smaller now
long newRegionMemstoreSize = secondaryRegion.getMemstoreSize().get();
assertTrue(regionMemstoreSize > newRegionMemstoreSize);
// assert that the store sizes are bigger
assertTrue(store.getSize() > storeSize);
assertTrue(store.getStoreSizeUncompressed() > storeSizeUncompressed);
assertEquals(store.getSize(), store.getStorefilesSize());
}
// after replay verify that everything is still visible
verifyData(secondaryRegion, 0, lastReplayed+1, cq, families);
@ -1112,6 +1120,207 @@ public class TestHRegionReplayEvents {
(WALKey)any(), (WALEdit)any(), (AtomicLong)any(), anyBoolean(), (List<Cell>) any());
}
/**
* Tests the reads enabled flag for the region. When unset all reads should be rejected
*/
@Test
public void testRegionReadsEnabledFlag() throws IOException {
putDataByReplay(secondaryRegion, 0, 100, cq, families);
verifyData(secondaryRegion, 0, 100, cq, families);
// now disable reads
secondaryRegion.setReadsEnabled(false);
try {
verifyData(secondaryRegion, 0, 100, cq, families);
fail("Should have failed with IOException");
} catch(IOException ex) {
// expected
}
// verify that we can still replay data
putDataByReplay(secondaryRegion, 100, 100, cq, families);
// now enable reads again
secondaryRegion.setReadsEnabled(true);
verifyData(secondaryRegion, 0, 200, cq, families);
}
/**
* Tests the case where a request for flush cache is sent to the region, but region cannot flush.
* It should write the flush request marker instead.
*/
@Test
public void testWriteFlushRequestMarker() throws IOException {
// primary region is empty at this point. Request a flush with writeFlushRequestWalMarker=false
FlushResult result = primaryRegion.flushcache(true, false);
assertNotNull(result);
assertEquals(result.result, FlushResult.Result.CANNOT_FLUSH_MEMSTORE_EMPTY);
assertFalse(result.wroteFlushWalMarker);
// request flush again, but this time with writeFlushRequestWalMarker = true
result = primaryRegion.flushcache(true, true);
assertNotNull(result);
assertEquals(result.result, FlushResult.Result.CANNOT_FLUSH_MEMSTORE_EMPTY);
assertTrue(result.wroteFlushWalMarker);
List<FlushDescriptor> flushes = Lists.newArrayList();
reader = createWALReaderForPrimary();
while (true) {
WAL.Entry entry = reader.next();
if (entry == null) {
break;
}
FlushDescriptor flush = WALEdit.getFlushDescriptor(entry.getEdit().getCells().get(0));
if (flush != null) {
flushes.add(flush);
}
}
assertEquals(1, flushes.size());
assertNotNull(flushes.get(0));
assertEquals(FlushDescriptor.FlushAction.CANNOT_FLUSH, flushes.get(0).getAction());
}
/**
* Test the case where the secondary region replica is not in reads enabled state because it is
* waiting for a flush or region open marker from primary region. Replaying CANNOT_FLUSH
* flush marker entry should restore the reads enabled status in the region and allow the reads
* to continue.
*/
@Test
public void testReplayingFlushRequestRestoresReadsEnabledState() throws IOException {
disableReads(secondaryRegion);
// Test case 1: Test that replaying CANNOT_FLUSH request marker assuming this came from
// triggered flush restores readsEnabled
primaryRegion.flushcache(true, true);
reader = createWALReaderForPrimary();
while (true) {
WAL.Entry entry = reader.next();
if (entry == null) {
break;
}
FlushDescriptor flush = WALEdit.getFlushDescriptor(entry.getEdit().getCells().get(0));
if (flush != null) {
secondaryRegion.replayWALFlushMarker(flush, entry.getKey().getLogSeqNum());
}
}
// now reads should be enabled
secondaryRegion.get(new Get(Bytes.toBytes(0)));
}
/**
* Test the case where the secondary region replica is not in reads enabled state because it is
* waiting for a flush or region open marker from primary region. Replaying flush start and commit
* entries should restore the reads enabled status in the region and allow the reads
* to continue.
*/
@Test
public void testReplayingFlushRestoresReadsEnabledState() throws IOException {
// Test case 2: Test that replaying FLUSH_START and FLUSH_COMMIT markers assuming these came
// from triggered flush restores readsEnabled
disableReads(secondaryRegion);
// put some data in primary
putData(primaryRegion, Durability.SYNC_WAL, 0, 100, cq, families);
primaryRegion.flushcache();
reader = createWALReaderForPrimary();
while (true) {
WAL.Entry entry = reader.next();
if (entry == null) {
break;
}
FlushDescriptor flush = WALEdit.getFlushDescriptor(entry.getEdit().getCells().get(0));
if (flush != null) {
secondaryRegion.replayWALFlushMarker(flush, entry.getKey().getLogSeqNum());
} else {
replayEdit(secondaryRegion, entry);
}
}
// now reads should be enabled
verifyData(secondaryRegion, 0, 100, cq, families);
}
/**
* Test the case where the secondary region replica is not in reads enabled state because it is
* waiting for a flush or region open marker from primary region. Replaying flush start and commit
* entries should restore the reads enabled status in the region and allow the reads
* to continue.
*/
@Test
public void testReplayingFlushWithEmptyMemstoreRestoresReadsEnabledState() throws IOException {
// Test case 2: Test that replaying FLUSH_START and FLUSH_COMMIT markers assuming these came
// from triggered flush restores readsEnabled
disableReads(secondaryRegion);
// put some data in primary
putData(primaryRegion, Durability.SYNC_WAL, 0, 100, cq, families);
primaryRegion.flushcache();
reader = createWALReaderForPrimary();
while (true) {
WAL.Entry entry = reader.next();
if (entry == null) {
break;
}
FlushDescriptor flush = WALEdit.getFlushDescriptor(entry.getEdit().getCells().get(0));
if (flush != null) {
secondaryRegion.replayWALFlushMarker(flush, entry.getKey().getLogSeqNum());
}
}
// now reads should be enabled
verifyData(secondaryRegion, 0, 100, cq, families);
}
/**
* Test the case where the secondary region replica is not in reads enabled state because it is
* waiting for a flush or region open marker from primary region. Replaying region open event
* entry from primary should restore the reads enabled status in the region and allow the reads
* to continue.
*/
@Test
public void testReplayingRegionOpenEventRestoresReadsEnabledState() throws IOException {
// Test case 3: Test that replaying region open event markers restores readsEnabled
disableReads(secondaryRegion);
primaryRegion.close();
primaryRegion = HRegion.openHRegion(rootDir, primaryHri, htd, walPrimary, CONF, rss, null);
reader = createWALReaderForPrimary();
while (true) {
WAL.Entry entry = reader.next();
if (entry == null) {
break;
}
RegionEventDescriptor regionEventDesc
= WALEdit.getRegionEventDescriptor(entry.getEdit().getCells().get(0));
if (regionEventDesc != null) {
secondaryRegion.replayWALRegionEventMarker(regionEventDesc);
}
}
// now reads should be enabled
secondaryRegion.get(new Get(Bytes.toBytes(0)));
}
private void disableReads(HRegion region) {
region.setReadsEnabled(false);
try {
verifyData(region, 0, 1, cq, families);
fail("Should have failed with IOException");
} catch(IOException ex) {
// expected
}
}
private void replay(HRegion region, Put put, long replaySeqId) throws IOException {
put.setDurability(Durability.SKIP_WAL);
MutationReplay mutation = new MutationReplay(MutationType.PUT, put, 0, 0);

View File

@ -0,0 +1,373 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hbase.regionserver;
import static org.junit.Assert.*;
import java.io.IOException;
import java.util.Arrays;
import java.util.Collection;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicReference;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.commons.logging.impl.Log4JLogger;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.HBaseTestingUtility;
import org.apache.hadoop.hbase.HConstants;
import org.apache.hadoop.hbase.HTableDescriptor;
import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.Waiter.Predicate;
import org.apache.hadoop.hbase.client.Admin;
import org.apache.hadoop.hbase.client.Connection;
import org.apache.hadoop.hbase.client.ConnectionFactory;
import org.apache.hadoop.hbase.client.Consistency;
import org.apache.hadoop.hbase.client.Get;
import org.apache.hadoop.hbase.client.RpcRetryingCallerImpl;
import org.apache.hadoop.hbase.client.Table;
import org.apache.hadoop.hbase.replication.regionserver.TestRegionReplicaReplicationEndpoint;
import org.apache.hadoop.hbase.testclassification.LargeTests;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.hbase.util.ServerRegionReplicaUtil;
import org.apache.hadoop.hbase.util.Threads;
import org.apache.hadoop.hbase.util.JVMClusterUtil.RegionServerThread;
import org.apache.log4j.Level;
import org.junit.After;
import org.junit.Before;
import org.junit.Rule;
import org.junit.Test;
import org.junit.experimental.categories.Category;
import org.junit.rules.TestName;
import org.junit.runner.RunWith;
import org.junit.runners.Parameterized;
import org.junit.runners.Parameterized.Parameters;
/**
* Tests failover of secondary region replicas.
*/
@RunWith(Parameterized.class)
@Category(LargeTests.class)
public class TestRegionReplicaFailover {
private static final Log LOG = LogFactory.getLog(TestRegionReplicaReplicationEndpoint.class);
static {
((Log4JLogger)RpcRetryingCallerImpl.LOG).getLogger().setLevel(Level.ALL);
}
private static final HBaseTestingUtility HTU = new HBaseTestingUtility();
private static final int NB_SERVERS = 3;
protected final byte[][] families = new byte[][] {HBaseTestingUtility.fam1,
HBaseTestingUtility.fam2, HBaseTestingUtility.fam3};
protected final byte[] fam = HBaseTestingUtility.fam1;
protected final byte[] qual1 = Bytes.toBytes("qual1");
protected final byte[] value1 = Bytes.toBytes("value1");
protected final byte[] row = Bytes.toBytes("rowA");
protected final byte[] row2 = Bytes.toBytes("rowB");
@Rule public TestName name = new TestName();
private HTableDescriptor htd;
/*
* We are testing with dist log split and dist log replay separately
*/
@Parameters
public static Collection<Object[]> getParameters() {
Object[][] params =
new Boolean[][] { {false} }; // TODO: enable dist log replay testing after HBASE-13121
return Arrays.asList(params);
}
@Parameterized.Parameter(0)
public boolean distributedLogReplay;
@Before
public void before() throws Exception {
Configuration conf = HTU.getConfiguration();
conf.setBoolean(HConstants.REPLICATION_ENABLE_KEY, true);
conf.setBoolean(ServerRegionReplicaUtil.REGION_REPLICA_REPLICATION_CONF_KEY, true);
conf.setBoolean(ServerRegionReplicaUtil.REGION_REPLICA_WAIT_FOR_PRIMARY_FLUSH_CONF_KEY, true);
conf.setInt("replication.stats.thread.period.seconds", 5);
conf.setBoolean("hbase.tests.use.shortcircuit.reads", false);
conf.setBoolean(HConstants.DISTRIBUTED_LOG_REPLAY_KEY, distributedLogReplay);
HTU.startMiniCluster(NB_SERVERS);
htd = HTU.createTableDescriptor(
name.getMethodName().substring(0, name.getMethodName().length()-3));
htd.setRegionReplication(3);
HTU.getHBaseAdmin().createTable(htd);
}
@After
public void after() throws Exception {
HTU.deleteTableIfAny(htd.getTableName());
HTU.shutdownMiniCluster();
}
/**
* Tests the case where a newly created table with region replicas and no data, the secondary
* region replicas are available to read immediately.
*/
@Test(timeout = 60000)
public void testSecondaryRegionWithEmptyRegion() throws IOException {
// Create a new table with region replication, don't put any data. Test that the secondary
// region replica is available to read.
try (Connection connection = ConnectionFactory.createConnection(HTU.getConfiguration());
Table table = connection.getTable(htd.getTableName())) {
Get get = new Get(row);
get.setConsistency(Consistency.TIMELINE);
get.setReplicaId(1);
table.get(get); // this should not block
}
}
/**
* Tests the case where if there is some data in the primary region, reopening the region replicas
* (enable/disable table, etc) makes the region replicas readable.
* @throws IOException
*/
@Test(timeout = 60000)
public void testSecondaryRegionWithNonEmptyRegion() throws IOException {
// Create a new table with region replication and load some data
// than disable and enable the table again and verify the data from secondary
try (Connection connection = ConnectionFactory.createConnection(HTU.getConfiguration());
Table table = connection.getTable(htd.getTableName())) {
HTU.loadNumericRows(table, fam, 0, 1000);
HTU.getHBaseAdmin().disableTable(htd.getTableName());
HTU.getHBaseAdmin().enableTable(htd.getTableName());
HTU.verifyNumericRows(table, fam, 0, 1000, 1);
}
}
/**
* Tests the case where killing a primary region with unflushed data recovers
*/
@Test (timeout = 120000)
public void testPrimaryRegionKill() throws Exception {
try (Connection connection = ConnectionFactory.createConnection(HTU.getConfiguration());
Table table = connection.getTable(htd.getTableName())) {
HTU.loadNumericRows(table, fam, 0, 1000);
// wal replication is async, we have to wait until the replication catches up, or we timeout
verifyNumericRowsWithTimeout(table, fam, 0, 1000, 1, 30000);
verifyNumericRowsWithTimeout(table, fam, 0, 1000, 2, 30000);
// we should not have flushed files now, but data in memstores of primary and secondary
// kill the primary region replica now, and ensure that when it comes back up, we can still
// read from it the same data from primary and secondaries
boolean aborted = false;
for (RegionServerThread rs : HTU.getMiniHBaseCluster().getRegionServerThreads()) {
for (HRegion r : rs.getRegionServer().getOnlineRegions(htd.getTableName())) {
if (r.getRegionInfo().getReplicaId() == 0) {
LOG.info("Aborting region server hosting primary region replica");
rs.getRegionServer().abort("for test");
aborted = true;
}
}
}
assertTrue(aborted);
// wal replication is async, we have to wait until the replication catches up, or we timeout
verifyNumericRowsWithTimeout(table, fam, 0, 1000, 0, 30000);
verifyNumericRowsWithTimeout(table, fam, 0, 1000, 1, 30000);
verifyNumericRowsWithTimeout(table, fam, 0, 1000, 2, 30000);
}
// restart the region server
HTU.getMiniHBaseCluster().startRegionServer();
}
/** wal replication is async, we have to wait until the replication catches up, or we timeout
*/
private void verifyNumericRowsWithTimeout(final Table table, final byte[] f, final int startRow,
final int endRow, final int replicaId, final long timeout) throws Exception {
try {
HTU.waitFor(timeout, new Predicate<Exception>() {
@Override
public boolean evaluate() throws Exception {
try {
HTU.verifyNumericRows(table, f, startRow, endRow, replicaId);
return true;
} catch (AssertionError ae) {
return false;
}
}
});
} catch (Throwable t) {
// ignore this, but redo the verify do get the actual exception
HTU.verifyNumericRows(table, f, startRow, endRow, replicaId);
}
}
/**
* Tests the case where killing a secondary region with unflushed data recovers, and the replica
* becomes available to read again shortly.
*/
@Test (timeout = 120000)
public void testSecondaryRegionKill() throws Exception {
try (Connection connection = ConnectionFactory.createConnection(HTU.getConfiguration());
Table table = connection.getTable(htd.getTableName())) {
HTU.loadNumericRows(table, fam, 0, 1000);
// wait for some time to ensure that async wal replication does it's magic
verifyNumericRowsWithTimeout(table, fam, 0, 1000, 1, 30000);
verifyNumericRowsWithTimeout(table, fam, 0, 1000, 2, 30000);
// we should not have flushed files now, but data in memstores of primary and secondary
// kill the secondary region replica now, and ensure that when it comes back up, we can still
// read from it the same data
boolean aborted = false;
for (RegionServerThread rs : HTU.getMiniHBaseCluster().getRegionServerThreads()) {
for (HRegion r : rs.getRegionServer().getOnlineRegions(htd.getTableName())) {
if (r.getRegionInfo().getReplicaId() == 1) {
LOG.info("Aborting region server hosting secondary region replica");
rs.getRegionServer().abort("for test");
aborted = true;
}
}
}
assertTrue(aborted);
Threads.sleep(5000);
HTU.verifyNumericRows(table, fam, 0, 1000, 1);
HTU.verifyNumericRows(table, fam, 0, 1000, 2);
}
// restart the region server
HTU.getMiniHBaseCluster().startRegionServer();
}
/**
* Tests the case where there are 3 region replicas and the primary is continuously accepting
* new writes while one of the secondaries is killed. Verification is done for both of the
* secondary replicas.
*/
@Test (timeout = 120000)
public void testSecondaryRegionKillWhilePrimaryIsAcceptingWrites() throws Exception {
try (Connection connection = ConnectionFactory.createConnection(HTU.getConfiguration());
Table table = connection.getTable(htd.getTableName());
Admin admin = connection.getAdmin()) {
// start a thread to do the loading of primary
HTU.loadNumericRows(table, fam, 0, 1000); // start with some base
admin.flush(table.getName());
HTU.loadNumericRows(table, fam, 1000, 2000);
final AtomicReference<Throwable> ex = new AtomicReference<Throwable>(null);
final AtomicBoolean done = new AtomicBoolean(false);
final AtomicInteger key = new AtomicInteger(2000);
Thread loader = new Thread() {
@Override
public void run() {
while (!done.get()) {
try {
HTU.loadNumericRows(table, fam, key.get(), key.get()+1000);
key.addAndGet(1000);
} catch (Throwable e) {
ex.compareAndSet(null, e);
}
}
}
};
loader.start();
Thread aborter = new Thread() {
@Override
public void run() {
try {
boolean aborted = false;
for (RegionServerThread rs : HTU.getMiniHBaseCluster().getRegionServerThreads()) {
for (HRegion r : rs.getRegionServer().getOnlineRegions(htd.getTableName())) {
if (r.getRegionInfo().getReplicaId() == 1) {
LOG.info("Aborting region server hosting secondary region replica");
rs.getRegionServer().abort("for test");
aborted = true;
}
}
}
assertTrue(aborted);
} catch (Throwable e) {
ex.compareAndSet(null, e);
}
};
};
aborter.start();
aborter.join();
done.set(true);
loader.join();
assertNull(ex.get());
assertTrue(key.get() > 1000); // assert that the test is working as designed
LOG.info("Loaded up to key :" + key.get());
verifyNumericRowsWithTimeout(table, fam, 0, key.get(), 0, 30000);
verifyNumericRowsWithTimeout(table, fam, 0, key.get(), 1, 30000);
verifyNumericRowsWithTimeout(table, fam, 0, key.get(), 2, 30000);
}
// restart the region server
HTU.getMiniHBaseCluster().startRegionServer();
}
/**
* Tests the case where we are creating a table with a lot of regions and replicas. Opening region
* replicas should not block handlers on RS indefinitely.
*/
@Test (timeout = 120000)
public void testLotsOfRegionReplicas() throws IOException {
int numRegions = NB_SERVERS * 20;
int regionReplication = 10;
String tableName = htd.getTableName().getNameAsString() + "2";
htd = HTU.createTableDescriptor(tableName);
htd.setRegionReplication(regionReplication);
// dont care about splits themselves too much
byte[] startKey = Bytes.toBytes("aaa");
byte[] endKey = Bytes.toBytes("zzz");
byte[][] splits = HTU.getRegionSplitStartKeys(startKey, endKey, numRegions);
HTU.getHBaseAdmin().createTable(htd, startKey, endKey, numRegions);
try (Connection connection = ConnectionFactory.createConnection(HTU.getConfiguration());
Table table = connection.getTable(htd.getTableName())) {
for (int i = 1; i < splits.length; i++) {
for (int j = 0; j < regionReplication; j++) {
Get get = new Get(splits[i]);
get.setConsistency(Consistency.TIMELINE);
get.setReplicaId(j);
table.get(get); // this should not block. Regions should be coming online
}
}
}
HTU.deleteTableIfAny(TableName.valueOf(tableName));
}
}

View File

@ -831,11 +831,12 @@ public class TestWALReplay {
new HRegion(basedir, newWal, newFS, newConf, hri, htd, null) {
@Override
protected FlushResult internalFlushcache(final WAL wal, final long myseqid,
Collection<Store> storesToFlush, MonitoredTask status)
throws IOException {
final Collection<Store> storesToFlush, MonitoredTask status,
boolean writeFlushWalMarker)
throws IOException {
LOG.info("InternalFlushCache Invoked");
FlushResult fs = super.internalFlushcache(wal, myseqid, storesToFlush,
Mockito.mock(MonitoredTask.class));
Mockito.mock(MonitoredTask.class), writeFlushWalMarker);
flushcount.incrementAndGet();
return fs;
};

View File

@ -98,6 +98,7 @@ public class TestRegionReplicaReplicationEndpointNoMaster {
Configuration conf = HTU.getConfiguration();
conf.setBoolean(HConstants.REPLICATION_ENABLE_KEY, true);
conf.setBoolean(ServerRegionReplicaUtil.REGION_REPLICA_REPLICATION_CONF_KEY, true);
conf.setBoolean(ServerRegionReplicaUtil.REGION_REPLICA_WAIT_FOR_PRIMARY_FLUSH_CONF_KEY, false);
// install WALObserver coprocessor for tests
String walCoprocs = HTU.getConfiguration().get(CoprocessorHost.WAL_COPROCESSOR_CONF_KEY);