HBASE-11568 Async WAL replication for region replicas

This commit is contained in:
Enis Soztutar 2014-08-19 18:59:22 -07:00
parent d44e7df5dc
commit e28ec72464
16 changed files with 1637 additions and 194 deletions

View File

@ -0,0 +1,131 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hbase.client;
import java.io.IOException;
import java.io.InterruptedIOException;
import java.net.ConnectException;
import java.net.SocketTimeoutException;
import org.apache.hadoop.hbase.HBaseIOException;
import org.apache.hadoop.hbase.HRegionLocation;
import org.apache.hadoop.hbase.NotServingRegionException;
import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.exceptions.RegionMovedException;
import org.apache.hadoop.hbase.protobuf.generated.AdminProtos.AdminService;
/**
* Similar to {@link RegionServerCallable} but for the AdminService interface. This service callable
* assumes a Table and row and thus does region locating similar to RegionServerCallable.
*/
public abstract class RegionAdminServiceCallable<T> implements RetryingCallable<T> {
protected final ClusterConnection connection;
protected AdminService.BlockingInterface stub;
protected HRegionLocation location;
protected final TableName tableName;
protected final byte[] row;
protected final static int MIN_WAIT_DEAD_SERVER = 10000;
public RegionAdminServiceCallable(ClusterConnection connection, TableName tableName, byte[] row) {
this(connection, null, tableName, row);
}
public RegionAdminServiceCallable(ClusterConnection connection, HRegionLocation location,
TableName tableName, byte[] row) {
this.connection = connection;
this.location = location;
this.tableName = tableName;
this.row = row;
}
@Override
public void prepare(boolean reload) throws IOException {
if (Thread.interrupted()) {
throw new InterruptedIOException();
}
if (reload || location == null) {
location = getLocation(!reload);
}
if (location == null) {
// With this exception, there will be a retry.
throw new HBaseIOException(getExceptionMessage());
}
this.setStub(connection.getAdmin(location.getServerName()));
}
protected void setStub(AdminService.BlockingInterface stub) {
this.stub = stub;
}
public abstract HRegionLocation getLocation(boolean useCache) throws IOException;
@Override
public void throwable(Throwable t, boolean retrying) {
if (t instanceof SocketTimeoutException ||
t instanceof ConnectException ||
t instanceof RetriesExhaustedException ||
(location != null && getConnection().isDeadServer(location.getServerName()))) {
// if thrown these exceptions, we clear all the cache entries that
// map to that slow/dead server; otherwise, let cache miss and ask
// hbase:meta again to find the new location
if (this.location != null) getConnection().clearCaches(location.getServerName());
} else if (t instanceof RegionMovedException) {
getConnection().updateCachedLocations(tableName, row, t, location);
} else if (t instanceof NotServingRegionException) {
// Purge cache entries for this specific region from hbase:meta cache
// since we don't call connect(true) when number of retries is 1.
getConnection().deleteCachedRegionLocation(location);
}
}
/**
* @return {@link HConnection} instance used by this Callable.
*/
HConnection getConnection() {
return this.connection;
}
//subclasses can override this.
protected String getExceptionMessage() {
return "There is no location";
}
@Override
public String getExceptionMessageAdditionalDetail() {
return null;
}
@Override
public long sleep(long pause, int tries) {
long sleep = ConnectionUtils.getPauseTime(pause, tries + 1);
if (sleep < MIN_WAIT_DEAD_SERVER
&& (location == null || connection.isDeadServer(location.getServerName()))) {
sleep = ConnectionUtils.addJitter(MIN_WAIT_DEAD_SERVER, 0.10f);
}
return sleep;
}
}

View File

@ -48,7 +48,7 @@ import com.google.protobuf.ServiceException;
*/
@InterfaceAudience.Private
public class RpcRetryingCaller<T> {
static final Log LOG = LogFactory.getLog(RpcRetryingCaller.class);
public static final Log LOG = LogFactory.getLog(RpcRetryingCaller.class);
/**
* When we started making calls.
*/

View File

@ -1389,6 +1389,20 @@ possible configurations would overwhelm and obscure the important.
value is also recommended with this setting.
</description>
</property>
<property>
<name>hbase.region.replica.replication.enabled</name>
<value>false</value>
<description>
Whether asynchronous WAL replication to the secondary region replicas is enabled or not.
If this is enabled, a replication peer named "region_replica_replication" will be created
which will tail the logs and replicate the mutatations to region replicas for tables that
have region replication > 1. If this is enabled once, disabling this replication also
requires disabling the replication peer using shell or ReplicationAdmin java class.
Replication to secondary region replicas works over standard inter-cluster replication.
So replication, if disabled explicitly, also has to be enabled by setting "hbase.replication"
to true for this feature to work.
</description>
</property>
<property>
<name>hbase.http.filter.initializers</name>
<value>org.apache.hadoop.hbase.http.lib.StaticUserWebFilter</value>

View File

@ -51,6 +51,7 @@ import org.apache.hadoop.hbase.protobuf.generated.ZooKeeperProtos;
import org.apache.hadoop.hbase.util.FSTableDescriptors;
import org.apache.hadoop.hbase.util.FSUtils;
import org.apache.hadoop.hbase.util.ModifyRegionUtils;
import org.apache.hadoop.hbase.util.ServerRegionReplicaUtil;
/**
* Handler to create a table.
@ -196,9 +197,8 @@ public class CreateTableHandler extends EventHandler {
*/
protected void completed(final Throwable exception) {
releaseTableLock();
String msg = exception == null ? null : exception.getMessage();
LOG.info("Table, " + this.hTableDescriptor.getTableName() + ", creation " +
msg == null ? "successful" : "failed. " + msg);
(exception == null ? "successful" : "failed. " + exception));
if (exception != null) {
removeEnablingTable(this.assignmentManager, this.hTableDescriptor.getTableName());
}
@ -243,11 +243,16 @@ public class CreateTableHandler extends EventHandler {
// 5. Add replicas if needed
regionInfos = addReplicas(hTableDescriptor, regionInfos);
// 6. Trigger immediate assignment of the regions in round-robin fashion
// 6. Setup replication for region replicas if needed
if (hTableDescriptor.getRegionReplication() > 1) {
ServerRegionReplicaUtil.setupRegionReplicaReplication(conf);
}
// 7. Trigger immediate assignment of the regions in round-robin fashion
ModifyRegionUtils.assignRegions(assignmentManager, regionInfos);
}
// 6. Set table enabled flag up in zk.
// 8. Set table enabled flag up in zk.
try {
assignmentManager.getTableStateManager().setTableState(tableName,
ZooKeeperProtos.Table.State.ENABLED);

View File

@ -59,7 +59,7 @@ public class ReplicationProtbufUtil {
public static void replicateWALEntry(final AdminService.BlockingInterface admin,
final HLog.Entry[] entries) throws IOException {
Pair<AdminProtos.ReplicateWALEntryRequest, CellScanner> p =
buildReplicateWALEntryRequest(entries);
buildReplicateWALEntryRequest(entries, null);
PayloadCarryingRpcController controller = new PayloadCarryingRpcController(p.getSecond());
try {
admin.replicateWALEntry(controller, p.getFirst());
@ -77,6 +77,19 @@ public class ReplicationProtbufUtil {
*/
public static Pair<AdminProtos.ReplicateWALEntryRequest, CellScanner>
buildReplicateWALEntryRequest(final HLog.Entry[] entries) {
return buildReplicateWALEntryRequest(entries, null);
}
/**
* Create a new ReplicateWALEntryRequest from a list of HLog entries
*
* @param entries the HLog entries to be replicated
* @param encodedRegionName alternative region name to use if not null
* @return a pair of ReplicateWALEntryRequest and a CellScanner over all the WALEdit values
* found.
*/
public static Pair<AdminProtos.ReplicateWALEntryRequest, CellScanner>
buildReplicateWALEntryRequest(final HLog.Entry[] entries, byte[] encodedRegionName) {
// Accumulate all the KVs seen in here.
List<List<? extends Cell>> allkvs = new ArrayList<List<? extends Cell>>(entries.length);
int size = 0;
@ -91,7 +104,9 @@ public class ReplicationProtbufUtil {
WALProtos.WALKey.Builder keyBuilder = entryBuilder.getKeyBuilder();
HLogKey key = entry.getKey();
keyBuilder.setEncodedRegionName(
ByteStringer.wrap(key.getEncodedRegionName()));
ByteStringer.wrap(encodedRegionName == null
? key.getEncodedRegionName()
: encodedRegionName));
keyBuilder.setTableName(ByteStringer.wrap(key.getTablename().getName()));
keyBuilder.setLogSequenceNumber(key.getLogSeqNum());
keyBuilder.setWriteTime(key.getWriteTime());

View File

@ -58,6 +58,7 @@ import org.apache.hadoop.hbase.UnknownScannerException;
import org.apache.hadoop.hbase.client.Append;
import org.apache.hadoop.hbase.client.ConnectionUtils;
import org.apache.hadoop.hbase.client.Delete;
import org.apache.hadoop.hbase.client.Durability;
import org.apache.hadoop.hbase.client.Get;
import org.apache.hadoop.hbase.client.Increment;
import org.apache.hadoop.hbase.client.Mutation;
@ -156,6 +157,7 @@ import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.hbase.util.Counter;
import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
import org.apache.hadoop.hbase.util.Pair;
import org.apache.hadoop.hbase.util.ServerRegionReplicaUtil;
import org.apache.hadoop.hbase.util.Strings;
import org.apache.hadoop.net.DNS;
import org.apache.zookeeper.KeeperException;
@ -1358,13 +1360,26 @@ public class RSRpcServices implements HBaseRPCErrorHandler,
// empty input
return ReplicateWALEntryResponse.newBuilder().build();
}
HRegion region = regionServer.getRegionByEncodedName(
entries.get(0).getKey().getEncodedRegionName().toStringUtf8());
RegionCoprocessorHost coprocessorHost = region.getCoprocessorHost();
ByteString regionName = entries.get(0).getKey().getEncodedRegionName();
HRegion region = regionServer.getRegionByEncodedName(regionName.toStringUtf8());
RegionCoprocessorHost coprocessorHost =
ServerRegionReplicaUtil.isDefaultReplica(region.getRegionInfo())
? region.getCoprocessorHost()
: null; // do not invoke coprocessors if this is a secondary region replica
List<Pair<HLogKey, WALEdit>> walEntries = new ArrayList<Pair<HLogKey, WALEdit>>();
// when tag is enabled, we need tag replay edits with log sequence number
boolean needAddReplayTag = (HFile.getFormatVersion(regionServer.conf) >= 3);
// Skip adding the edits to WAL if this is a secondary region replica
boolean isPrimary = RegionReplicaUtil.isDefaultReplica(region.getRegionInfo());
Durability durability = isPrimary ? Durability.USE_DEFAULT : Durability.SKIP_WAL;
for (WALEntry entry : entries) {
if (!regionName.equals(entry.getKey().getEncodedRegionName())) {
throw new NotServingRegionException("Replay request contains entries from multiple " +
"regions. First region:" + regionName.toStringUtf8() + " , other region:"
+ entry.getKey().getEncodedRegionName());
}
if (regionServer.nonceManager != null) {
long nonceGroup = entry.getKey().hasNonceGroup()
? entry.getKey().getNonceGroup() : HConstants.NO_NONCE;
@ -1374,7 +1389,7 @@ public class RSRpcServices implements HBaseRPCErrorHandler,
Pair<HLogKey, WALEdit> walEntry = (coprocessorHost == null) ? null :
new Pair<HLogKey, WALEdit>();
List<HLogSplitter.MutationReplay> edits = HLogSplitter.getMutationsFromWALEntry(entry,
cells, walEntry, needAddReplayTag);
cells, walEntry, needAddReplayTag, durability);
if (coprocessorHost != null) {
// Start coprocessor replay here. The coprocessor is for each WALEdit instead of a
// KeyValue.

View File

@ -133,6 +133,7 @@ public class HLogSplitter {
// Major subcomponents of the split process.
// These are separated into inner classes to make testing easier.
PipelineController controller;
OutputSink outputSink;
EntryBuffers entryBuffers;
@ -141,14 +142,6 @@ public class HLogSplitter {
private ZooKeeperWatcher watcher;
private CoordinatedStateManager csm;
// If an exception is thrown by one of the other threads, it will be
// stored here.
protected AtomicReference<Throwable> thrown = new AtomicReference<Throwable>();
// Wait/notify for when data has been produced by the reader thread,
// consumed by the reader thread, or an exception occurred
final Object dataAvailable = new Object();
private MonitoredTask status;
// For checking the latest flushed sequence id
@ -184,8 +177,9 @@ public class HLogSplitter {
this.sequenceIdChecker = idChecker;
this.watcher = zkw;
this.csm = csm;
this.controller = new PipelineController();
entryBuffers = new EntryBuffers(
entryBuffers = new EntryBuffers(controller,
this.conf.getInt("hbase.regionserver.hlog.splitlog.buffersize",
128*1024*1024));
@ -196,13 +190,13 @@ public class HLogSplitter {
this.numWriterThreads = this.conf.getInt("hbase.regionserver.hlog.splitlog.writer.threads", 3);
if (zkw != null && csm != null && this.distributedLogReplay) {
outputSink = new LogReplayOutputSink(numWriterThreads);
outputSink = new LogReplayOutputSink(controller, entryBuffers, numWriterThreads);
} else {
if (this.distributedLogReplay) {
LOG.info("ZooKeeperWatcher is passed in as NULL so disable distrubitedLogRepaly.");
}
this.distributedLogReplay = false;
outputSink = new LogRecoveredEditsOutputSink(numWriterThreads);
outputSink = new LogRecoveredEditsOutputSink(controller, entryBuffers, numWriterThreads);
}
}
@ -636,22 +630,6 @@ public class HLogSplitter {
}
}
private void writerThreadError(Throwable t) {
thrown.compareAndSet(null, t);
}
/**
* Check for errors in the writer threads. If any is found, rethrow it.
*/
private void checkForErrors() throws IOException {
Throwable thrown = this.thrown.get();
if (thrown == null) return;
if (thrown instanceof IOException) {
throw new IOException(thrown);
} else {
throw new RuntimeException(thrown);
}
}
/**
* Create a new {@link Writer} for writing log splits.
*/
@ -679,6 +657,36 @@ public class HLogSplitter {
return result;
}
/**
* Contains some methods to control WAL-entries producer / consumer interactions
*/
public static class PipelineController {
// If an exception is thrown by one of the other threads, it will be
// stored here.
AtomicReference<Throwable> thrown = new AtomicReference<Throwable>();
// Wait/notify for when data has been produced by the writer thread,
// consumed by the reader thread, or an exception occurred
public final Object dataAvailable = new Object();
void writerThreadError(Throwable t) {
thrown.compareAndSet(null, t);
}
/**
* Check for errors in the writer threads. If any is found, rethrow it.
*/
void checkForErrors() throws IOException {
Throwable thrown = this.thrown.get();
if (thrown == null) return;
if (thrown instanceof IOException) {
throw new IOException(thrown);
} else {
throw new RuntimeException(thrown);
}
}
}
/**
* Class which accumulates edits and separates them into a buffer per region
* while simultaneously accounting RAM usage. Blocks if the RAM usage crosses
@ -686,7 +694,9 @@ public class HLogSplitter {
*
* Writer threads then pull region-specific buffers from this class.
*/
class EntryBuffers {
public static class EntryBuffers {
PipelineController controller;
Map<byte[], RegionEntryBuffer> buffers =
new TreeMap<byte[], RegionEntryBuffer>(Bytes.BYTES_COMPARATOR);
@ -698,7 +708,8 @@ public class HLogSplitter {
long totalBuffered = 0;
long maxHeapUsage;
EntryBuffers(long maxHeapUsage) {
public EntryBuffers(PipelineController controller, long maxHeapUsage) {
this.controller = controller;
this.maxHeapUsage = maxHeapUsage;
}
@ -709,7 +720,7 @@ public class HLogSplitter {
* @throws InterruptedException
* @throws IOException
*/
void appendEntry(Entry entry) throws InterruptedException, IOException {
public void appendEntry(Entry entry) throws InterruptedException, IOException {
HLogKey key = entry.getKey();
RegionEntryBuffer buffer;
@ -724,15 +735,15 @@ public class HLogSplitter {
}
// If we crossed the chunk threshold, wait for more space to be available
synchronized (dataAvailable) {
synchronized (controller.dataAvailable) {
totalBuffered += incrHeap;
while (totalBuffered > maxHeapUsage && thrown.get() == null) {
while (totalBuffered > maxHeapUsage && controller.thrown.get() == null) {
LOG.debug("Used " + totalBuffered + " bytes of buffered edits, waiting for IO threads...");
dataAvailable.wait(2000);
controller.dataAvailable.wait(2000);
}
dataAvailable.notifyAll();
controller.dataAvailable.notifyAll();
}
checkForErrors();
controller.checkForErrors();
}
/**
@ -765,16 +776,30 @@ public class HLogSplitter {
}
long size = buffer.heapSize();
synchronized (dataAvailable) {
synchronized (controller.dataAvailable) {
totalBuffered -= size;
// We may unblock writers
dataAvailable.notifyAll();
controller.dataAvailable.notifyAll();
}
}
synchronized boolean isRegionCurrentlyWriting(byte[] region) {
return currentlyWriting.contains(region);
}
public void waitUntilDrained() {
synchronized (controller.dataAvailable) {
while (totalBuffered > 0) {
try {
controller.dataAvailable.wait(2000);
} catch (InterruptedException e) {
LOG.warn("Got intrerrupted while waiting for EntryBuffers is drained");
Thread.interrupted();
break;
}
}
}
}
}
/**
@ -783,7 +808,7 @@ public class HLogSplitter {
* share a single byte array instance for the table and region name.
* Also tracks memory usage of the accumulated edits.
*/
static class RegionEntryBuffer implements HeapSize {
public static class RegionEntryBuffer implements HeapSize {
long heapInBuffer = 0;
List<Entry> entryBuffer;
TableName tableName;
@ -815,14 +840,30 @@ public class HLogSplitter {
public long heapSize() {
return heapInBuffer;
}
public byte[] getEncodedRegionName() {
return encodedRegionName;
}
public List<Entry> getEntryBuffer() {
return entryBuffer;
}
public TableName getTableName() {
return tableName;
}
}
class WriterThread extends Thread {
public static class WriterThread extends Thread {
private volatile boolean shouldStop = false;
private PipelineController controller;
private EntryBuffers entryBuffers;
private OutputSink outputSink = null;
WriterThread(OutputSink sink, int i) {
WriterThread(PipelineController controller, EntryBuffers entryBuffers, OutputSink sink, int i){
super(Thread.currentThread().getName() + "-Writer-" + i);
this.controller = controller;
this.entryBuffers = entryBuffers;
outputSink = sink;
}
@ -832,7 +873,7 @@ public class HLogSplitter {
doRun();
} catch (Throwable t) {
LOG.error("Exiting thread", t);
writerThreadError(t);
controller.writerThreadError(t);
}
}
@ -842,12 +883,12 @@ public class HLogSplitter {
RegionEntryBuffer buffer = entryBuffers.getChunkToWrite();
if (buffer == null) {
// No data currently available, wait on some more to show up
synchronized (dataAvailable) {
synchronized (controller.dataAvailable) {
if (shouldStop && !this.outputSink.flush()) {
return;
}
try {
dataAvailable.wait(500);
controller.dataAvailable.wait(500);
} catch (InterruptedException ie) {
if (!shouldStop) {
throw new RuntimeException(ie);
@ -871,9 +912,9 @@ public class HLogSplitter {
}
void finish() {
synchronized (dataAvailable) {
synchronized (controller.dataAvailable) {
shouldStop = true;
dataAvailable.notifyAll();
controller.dataAvailable.notifyAll();
}
}
}
@ -882,7 +923,10 @@ public class HLogSplitter {
* The following class is an abstraction class to provide a common interface to support both
* existing recovered edits file sink and region server WAL edits replay sink
*/
abstract class OutputSink {
public static abstract class OutputSink {
protected PipelineController controller;
protected EntryBuffers entryBuffers;
protected Map<byte[], SinkWriter> writers = Collections
.synchronizedMap(new TreeMap<byte[], SinkWriter>(Bytes.BYTES_COMPARATOR));;
@ -908,8 +952,10 @@ public class HLogSplitter {
protected List<Path> splits = null;
public OutputSink(int numWriters) {
public OutputSink(PipelineController controller, EntryBuffers entryBuffers, int numWriters) {
numThreads = numWriters;
this.controller = controller;
this.entryBuffers = entryBuffers;
}
void setReporter(CancelableProgressable reporter) {
@ -919,9 +965,9 @@ public class HLogSplitter {
/**
* Start the threads that will pump data from the entryBuffers to the output files.
*/
synchronized void startWriterThreads() {
public synchronized void startWriterThreads() {
for (int i = 0; i < numThreads; i++) {
WriterThread t = new WriterThread(this, i);
WriterThread t = new WriterThread(controller, entryBuffers, this, i);
t.start();
writerThreads.add(t);
}
@ -980,34 +1026,34 @@ public class HLogSplitter {
throw iie;
}
}
checkForErrors();
controller.checkForErrors();
LOG.info("Split writers finished");
return (!progress_failed);
}
abstract List<Path> finishWritingAndClose() throws IOException;
public abstract List<Path> finishWritingAndClose() throws IOException;
/**
* @return a map from encoded region ID to the number of edits written out for that region.
*/
abstract Map<byte[], Long> getOutputCounts();
public abstract Map<byte[], Long> getOutputCounts();
/**
* @return number of regions we've recovered
*/
abstract int getNumberOfRecoveredRegions();
public abstract int getNumberOfRecoveredRegions();
/**
* @param buffer A WAL Edit Entry
* @throws IOException
*/
abstract void append(RegionEntryBuffer buffer) throws IOException;
public abstract void append(RegionEntryBuffer buffer) throws IOException;
/**
* WriterThread call this function to help flush internal remaining edits in buffer before close
* @return true when underlying sink has something to flush
*/
protected boolean flush() throws IOException {
public boolean flush() throws IOException {
return false;
}
}
@ -1017,13 +1063,14 @@ public class HLogSplitter {
*/
class LogRecoveredEditsOutputSink extends OutputSink {
public LogRecoveredEditsOutputSink(int numWriters) {
public LogRecoveredEditsOutputSink(PipelineController controller, EntryBuffers entryBuffers,
int numWriters) {
// More threads could potentially write faster at the expense
// of causing more disk seeks as the logs are split.
// 3. After a certain setting (probably around 3) the
// process will be bound on the reader in the current
// implementation anyway.
super(numWriters);
super(controller, entryBuffers, numWriters);
}
/**
@ -1031,7 +1078,7 @@ public class HLogSplitter {
* @throws IOException
*/
@Override
List<Path> finishWritingAndClose() throws IOException {
public List<Path> finishWritingAndClose() throws IOException {
boolean isSuccessful = false;
List<Path> result = null;
try {
@ -1247,7 +1294,7 @@ public class HLogSplitter {
}
@Override
void append(RegionEntryBuffer buffer) throws IOException {
public void append(RegionEntryBuffer buffer) throws IOException {
List<Entry> entries = buffer.entryBuffer;
if (entries.isEmpty()) {
LOG.warn("got an empty buffer, skipping");
@ -1287,7 +1334,7 @@ public class HLogSplitter {
* @return a map from encoded region ID to the number of edits written out for that region.
*/
@Override
Map<byte[], Long> getOutputCounts() {
public Map<byte[], Long> getOutputCounts() {
TreeMap<byte[], Long> ret = new TreeMap<byte[], Long>(Bytes.BYTES_COMPARATOR);
synchronized (writers) {
for (Map.Entry<byte[], SinkWriter> entry : writers.entrySet()) {
@ -1298,7 +1345,7 @@ public class HLogSplitter {
}
@Override
int getNumberOfRecoveredRegions() {
public int getNumberOfRecoveredRegions() {
return writers.size();
}
}
@ -1306,7 +1353,7 @@ public class HLogSplitter {
/**
* Class wraps the actual writer which writes data out and related statistics
*/
private abstract static class SinkWriter {
public abstract static class SinkWriter {
/* Count of edits written to this path */
long editsWritten = 0;
/* Number of nanos spent writing to this log */
@ -1367,16 +1414,18 @@ public class HLogSplitter {
private LogRecoveredEditsOutputSink logRecoveredEditsOutputSink;
private boolean hasEditsInDisablingOrDisabledTables = false;
public LogReplayOutputSink(int numWriters) {
super(numWriters);
public LogReplayOutputSink(PipelineController controller, EntryBuffers entryBuffers,
int numWriters) {
super(controller, entryBuffers, numWriters);
this.waitRegionOnlineTimeOut = conf.getInt("hbase.splitlog.manager.timeout",
SplitLogManager.DEFAULT_TIMEOUT);
this.logRecoveredEditsOutputSink = new LogRecoveredEditsOutputSink(numWriters);
this.logRecoveredEditsOutputSink = new LogRecoveredEditsOutputSink(controller,
entryBuffers, numWriters);
this.logRecoveredEditsOutputSink.setReporter(reporter);
}
@Override
void append(RegionEntryBuffer buffer) throws IOException {
public void append(RegionEntryBuffer buffer) throws IOException {
List<Entry> entries = buffer.entryBuffer;
if (entries.isEmpty()) {
LOG.warn("got an empty buffer, skipping");
@ -1692,7 +1741,7 @@ public class HLogSplitter {
}
@Override
protected boolean flush() throws IOException {
public boolean flush() throws IOException {
String curLoc = null;
int curSize = 0;
List<Pair<HRegionLocation, HLog.Entry>> curQueue = null;
@ -1712,7 +1761,7 @@ public class HLogSplitter {
if (curSize > 0) {
this.processWorkItems(curLoc, curQueue);
dataAvailable.notifyAll();
controller.dataAvailable.notifyAll();
return true;
}
return false;
@ -1723,7 +1772,7 @@ public class HLogSplitter {
}
@Override
List<Path> finishWritingAndClose() throws IOException {
public List<Path> finishWritingAndClose() throws IOException {
try {
if (!finishWriting()) {
return null;
@ -1798,7 +1847,7 @@ public class HLogSplitter {
}
@Override
Map<byte[], Long> getOutputCounts() {
public Map<byte[], Long> getOutputCounts() {
TreeMap<byte[], Long> ret = new TreeMap<byte[], Long>(Bytes.BYTES_COMPARATOR);
synchronized (writers) {
for (Map.Entry<String, RegionServerWriter> entry : writers.entrySet()) {
@ -1809,7 +1858,7 @@ public class HLogSplitter {
}
@Override
int getNumberOfRecoveredRegions() {
public int getNumberOfRecoveredRegions() {
return this.recoveredRegions.size();
}
@ -1945,7 +1994,8 @@ public class HLogSplitter {
* @throws IOException
*/
public static List<MutationReplay> getMutationsFromWALEntry(WALEntry entry, CellScanner cells,
Pair<HLogKey, WALEdit> logEntry, boolean addLogReplayTag) throws IOException {
Pair<HLogKey, WALEdit> logEntry, boolean addLogReplayTag, Durability durability)
throws IOException {
if (entry == null) {
// return an empty array
@ -1998,6 +2048,9 @@ public class HLogSplitter {
}
((Put) m).add(KeyValueUtil.ensureKeyValue(tmpNewCell));
}
if (m != null) {
m.setDurability(durability);
}
previousCell = cell;
}

View File

@ -0,0 +1,558 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hbase.replication.regionserver;
import java.io.IOException;
import java.io.InterruptedIOException;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import java.util.concurrent.Callable;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Future;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicLong;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hbase.CellScanner;
import org.apache.hadoop.hbase.DoNotRetryIOException;
import org.apache.hadoop.hbase.HBaseConfiguration;
import org.apache.hadoop.hbase.HBaseIOException;
import org.apache.hadoop.hbase.HConstants;
import org.apache.hadoop.hbase.HRegionInfo;
import org.apache.hadoop.hbase.HRegionLocation;
import org.apache.hadoop.hbase.RegionLocations;
import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.TableNotFoundException;
import org.apache.hadoop.hbase.client.RegionAdminServiceCallable;
import org.apache.hadoop.hbase.client.ClusterConnection;
import org.apache.hadoop.hbase.client.HConnectionManager;
import org.apache.hadoop.hbase.client.RegionReplicaUtil;
import org.apache.hadoop.hbase.client.RetriesExhaustedException;
import org.apache.hadoop.hbase.client.RetryingCallable;
import org.apache.hadoop.hbase.client.RpcRetryingCallerFactory;
import org.apache.hadoop.hbase.ipc.PayloadCarryingRpcController;
import org.apache.hadoop.hbase.ipc.RpcControllerFactory;
import org.apache.hadoop.hbase.protobuf.ProtobufUtil;
import org.apache.hadoop.hbase.protobuf.ReplicationProtbufUtil;
import org.apache.hadoop.hbase.protobuf.generated.AdminProtos;
import org.apache.hadoop.hbase.protobuf.generated.AdminProtos.ReplicateWALEntryResponse;
import org.apache.hadoop.hbase.regionserver.wal.HLog;
import org.apache.hadoop.hbase.regionserver.wal.HLogSplitter.PipelineController;
import org.apache.hadoop.hbase.regionserver.wal.HLogSplitter.RegionEntryBuffer;
import org.apache.hadoop.hbase.regionserver.wal.HLogSplitter.SinkWriter;
import org.apache.hadoop.hbase.regionserver.wal.WALCellCodec;
import org.apache.hadoop.hbase.regionserver.wal.HLog.Entry;
import org.apache.hadoop.hbase.regionserver.wal.HLogSplitter.EntryBuffers;
import org.apache.hadoop.hbase.regionserver.wal.HLogSplitter.OutputSink;
import org.apache.hadoop.hbase.replication.HBaseReplicationEndpoint;
import org.apache.hadoop.hbase.replication.ReplicationEndpoint;
import org.apache.hadoop.hbase.replication.WALEntryFilter;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.hbase.util.Pair;
import org.apache.hadoop.hbase.util.Threads;
import org.apache.hadoop.util.StringUtils;
import com.google.common.cache.Cache;
import com.google.common.cache.CacheBuilder;
import com.google.protobuf.ServiceException;
/**
* A {@link ReplicationEndpoint} endpoint which receives the WAL edits from the
* WAL, and sends the edits to replicas of regions.
*/
@InterfaceAudience.Private
public class RegionReplicaReplicationEndpoint extends HBaseReplicationEndpoint {
private static final Log LOG = LogFactory.getLog(RegionReplicaReplicationEndpoint.class);
private Configuration conf;
private ClusterConnection connection;
// Reuse HLogSplitter constructs as a WAL pipe
private PipelineController controller;
private RegionReplicaOutputSink outputSink;
private EntryBuffers entryBuffers;
// Number of writer threads
private int numWriterThreads;
private int operationTimeout;
private ExecutorService pool;
@Override
public void init(Context context) throws IOException {
super.init(context);
this.conf = HBaseConfiguration.create(context.getConfiguration());
String codecClassName = conf
.get(WALCellCodec.WAL_CELL_CODEC_CLASS_KEY, WALCellCodec.class.getName());
conf.set(HConstants.RPC_CODEC_CONF_KEY, codecClassName);
this.numWriterThreads = this.conf.getInt(
"hbase.region.replica.replication.writer.threads", 3);
controller = new PipelineController();
entryBuffers = new EntryBuffers(controller,
this.conf.getInt("hbase.region.replica.replication.buffersize",
128*1024*1024));
// use the regular RPC timeout for replica replication RPC's
this.operationTimeout = conf.getInt(HConstants.HBASE_CLIENT_OPERATION_TIMEOUT,
HConstants.DEFAULT_HBASE_CLIENT_OPERATION_TIMEOUT);
}
@Override
protected void doStart() {
try {
connection = (ClusterConnection) HConnectionManager.createConnection(ctx.getConfiguration());
this.pool = getDefaultThreadPool(conf);
outputSink = new RegionReplicaOutputSink(controller, entryBuffers, connection, pool,
numWriterThreads, operationTimeout);
outputSink.startWriterThreads();
super.doStart();
} catch (IOException ex) {
LOG.warn("Received exception while creating connection :" + ex);
notifyFailed(ex);
}
}
@Override
protected void doStop() {
if (outputSink != null) {
try {
outputSink.finishWritingAndClose();
} catch (IOException ex) {
LOG.warn("Got exception while trying to close OutputSink");
LOG.warn(ex);
}
}
if (this.pool != null) {
this.pool.shutdownNow();
try {
// wait for 10 sec
boolean shutdown = this.pool.awaitTermination(10000, TimeUnit.MILLISECONDS);
if (!shutdown) {
LOG.warn("Failed to shutdown the thread pool after 10 seconds");
}
} catch (InterruptedException e) {
LOG.warn("Got interrupted while waiting for the thread pool to shut down" + e);
}
}
if (connection != null) {
try {
connection.close();
} catch (IOException ex) {
LOG.warn("Got exception closing connection :" + ex);
}
}
super.doStop();
}
/**
* Returns a Thread pool for the RPC's to region replicas. Similar to
* Connection's thread pool.
*/
private ExecutorService getDefaultThreadPool(Configuration conf) {
int maxThreads = conf.getInt("hbase.region.replica.replication.threads.max", 256);
int coreThreads = conf.getInt("hbase.region.replica.replication.threads.core", 16);
if (maxThreads == 0) {
maxThreads = Runtime.getRuntime().availableProcessors() * 8;
}
if (coreThreads == 0) {
coreThreads = Runtime.getRuntime().availableProcessors() * 8;
}
long keepAliveTime = conf.getLong("hbase.region.replica.replication.threads.keepalivetime", 60);
LinkedBlockingQueue<Runnable> workQueue =
new LinkedBlockingQueue<Runnable>(maxThreads *
conf.getInt(HConstants.HBASE_CLIENT_MAX_TOTAL_TASKS,
HConstants.DEFAULT_HBASE_CLIENT_MAX_TOTAL_TASKS));
ThreadPoolExecutor tpe = new ThreadPoolExecutor(
coreThreads,
maxThreads,
keepAliveTime,
TimeUnit.SECONDS,
workQueue,
Threads.newDaemonThreadFactory(this.getClass().toString() + "-rpc-shared-"));
tpe.allowCoreThreadTimeOut(true);
return tpe;
}
@Override
public boolean replicate(ReplicateContext replicateContext) {
/* A note on batching in RegionReplicaReplicationEndpoint (RRRE):
*
* RRRE relies on batching from two different mechanisms. The first is the batching from
* ReplicationSource since RRRE is a ReplicationEndpoint driven by RS. RS reads from a single
* WAL file filling up a buffer of heap size "replication.source.size.capacity"(64MB) or at most
* "replication.source.nb.capacity" entries or until it sees the end of file (in live tailing).
* Then RS passes all the buffered edits in this replicate() call context. RRRE puts the edits
* to the HLogSplitter.EntryBuffers which is a blocking buffer space of up to
* "hbase.region.replica.replication.buffersize" (128MB) in size. This buffer splits the edits
* based on regions.
*
* There are "hbase.region.replica.replication.writer.threads"(default 3) writer threads which
* pick largest per-region buffer and send it to the SinkWriter (see RegionReplicaOutputSink).
* The SinkWriter in this case will send the wal edits to all secondary region replicas in
* parallel via a retrying rpc call. EntryBuffers guarantees that while a buffer is
* being written to the sink, another buffer for the same region will not be made available to
* writers ensuring regions edits are not replayed out of order.
*
* The replicate() call won't return until all the buffers are sent and ack'd by the sinks so
* that the replication can assume all edits are persisted. We may be able to do a better
* pipelining between the replication thread and output sinks later if it becomes a bottleneck.
*/
while (this.isRunning()) {
try {
for (Entry entry: replicateContext.getEntries()) {
entryBuffers.appendEntry(entry);
}
outputSink.flush(); // make sure everything is flushed
return true;
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
return false;
} catch (IOException e) {
LOG.warn("Received IOException while trying to replicate"
+ StringUtils.stringifyException(e));
}
}
return false;
}
@Override
public boolean canReplicateToSameCluster() {
return true;
}
@Override
protected WALEntryFilter getScopeWALEntryFilter() {
// we do not care about scope. We replicate everything.
return null;
}
static class RegionReplicaOutputSink extends OutputSink {
private RegionReplicaSinkWriter sinkWriter;
public RegionReplicaOutputSink(PipelineController controller, EntryBuffers entryBuffers,
ClusterConnection connection, ExecutorService pool, int numWriters, int operationTimeout) {
super(controller, entryBuffers, numWriters);
this.sinkWriter = new RegionReplicaSinkWriter(this, connection, pool, operationTimeout);
}
@Override
public void append(RegionEntryBuffer buffer) throws IOException {
List<Entry> entries = buffer.getEntryBuffer();
if (entries.isEmpty() || entries.get(0).getEdit().getKeyValues().isEmpty()) {
return;
}
sinkWriter.append(buffer.getTableName(), buffer.getEncodedRegionName(),
entries.get(0).getEdit().getKeyValues().get(0).getRow(), entries);
}
@Override
public boolean flush() throws IOException {
// nothing much to do for now. Wait for the Writer threads to finish up
// append()'ing the data.
entryBuffers.waitUntilDrained();
return super.flush();
}
@Override
public List<Path> finishWritingAndClose() throws IOException {
finishWriting();
return null;
}
@Override
public Map<byte[], Long> getOutputCounts() {
return null; // only used in tests
}
@Override
public int getNumberOfRecoveredRegions() {
return 0;
}
AtomicLong getSkippedEditsCounter() {
return skippedEdits;
}
}
static class RegionReplicaSinkWriter extends SinkWriter {
RegionReplicaOutputSink sink;
ClusterConnection connection;
RpcControllerFactory rpcControllerFactory;
RpcRetryingCallerFactory rpcRetryingCallerFactory;
int operationTimeout;
ExecutorService pool;
Cache<TableName, Boolean> disabledAndDroppedTables;
public RegionReplicaSinkWriter(RegionReplicaOutputSink sink, ClusterConnection connection,
ExecutorService pool, int operationTimeout) {
this.sink = sink;
this.connection = connection;
this.operationTimeout = operationTimeout;
this.rpcRetryingCallerFactory
= RpcRetryingCallerFactory.instantiate(connection.getConfiguration());
this.rpcControllerFactory = RpcControllerFactory.instantiate(connection.getConfiguration());
this.pool = pool;
int nonExistentTableCacheExpiryMs = connection.getConfiguration()
.getInt("hbase.region.replica.replication.cache.disabledAndDroppedTables.expiryMs", 5000);
// A cache for non existing tables that have a default expiry of 5 sec. This means that if the
// table is created again with the same name, we might miss to replicate for that amount of
// time. But this cache prevents overloading meta requests for every edit from a deleted file.
disabledAndDroppedTables = CacheBuilder.newBuilder()
.expireAfterWrite(nonExistentTableCacheExpiryMs, TimeUnit.MILLISECONDS)
.initialCapacity(10)
.maximumSize(1000)
.build();
}
public void append(TableName tableName, byte[] encodedRegionName, byte[] row,
List<Entry> entries) throws IOException {
if (disabledAndDroppedTables.getIfPresent(tableName) != null) {
sink.getSkippedEditsCounter().incrementAndGet();
return;
}
// get the replicas of the primary region
RegionLocations locations = null;
try {
locations = getRegionLocations(connection, tableName, row, true, 0);
if (locations == null) {
throw new HBaseIOException("Cannot locate locations for "
+ tableName + ", row:" + Bytes.toStringBinary(row));
}
} catch (TableNotFoundException e) {
disabledAndDroppedTables.put(tableName, Boolean.TRUE); // put to cache. Value ignored
// skip this entry
sink.getSkippedEditsCounter().addAndGet(entries.size());
return;
}
if (locations.size() == 1) {
return;
}
ArrayList<Future<ReplicateWALEntryResponse>> tasks
= new ArrayList<Future<ReplicateWALEntryResponse>>(2);
// check whether we should still replay this entry. If the regions are changed, or the
// entry is not coming form the primary region, filter it out.
HRegionLocation primaryLocation = locations.getDefaultRegionLocation();
if (!Bytes.equals(primaryLocation.getRegionInfo().getEncodedNameAsBytes(),
encodedRegionName)) {
sink.getSkippedEditsCounter().addAndGet(entries.size());
return;
}
// All passed entries should belong to one region because it is coming from the EntryBuffers
// split per region. But the regions might split and merge (unlike log recovery case).
for (int replicaId = 0; replicaId < locations.size(); replicaId++) {
HRegionLocation location = locations.getRegionLocation(replicaId);
if (!RegionReplicaUtil.isDefaultReplica(replicaId)) {
HRegionInfo regionInfo = location == null
? RegionReplicaUtil.getRegionInfoForReplica(
locations.getDefaultRegionLocation().getRegionInfo(), replicaId)
: location.getRegionInfo();
RegionReplicaReplayCallable callable = new RegionReplicaReplayCallable(connection,
rpcControllerFactory, tableName, location, regionInfo, row, entries,
sink.getSkippedEditsCounter());
Future<ReplicateWALEntryResponse> task = pool.submit(
new RetryingRpcCallable<ReplicateWALEntryResponse>(rpcRetryingCallerFactory,
callable, operationTimeout));
tasks.add(task);
}
}
boolean tasksCancelled = false;
for (Future<ReplicateWALEntryResponse> task : tasks) {
try {
task.get();
} catch (InterruptedException e) {
throw new InterruptedIOException(e.getMessage());
} catch (ExecutionException e) {
Throwable cause = e.getCause();
if (cause instanceof IOException) {
// The table can be disabled or dropped at this time. For disabled tables, we have no
// cheap mechanism to detect this case because meta does not contain this information.
// HConnection.isTableDisabled() is a zk call which we cannot do for every replay RPC.
// So instead we start the replay RPC with retries and
// check whether the table is dropped or disabled which might cause
// SocketTimeoutException, or RetriesExhaustedException or similar if we get IOE.
if (cause instanceof TableNotFoundException || connection.isTableDisabled(tableName)) {
disabledAndDroppedTables.put(tableName, Boolean.TRUE); // put to cache for later.
if (!tasksCancelled) {
sink.getSkippedEditsCounter().addAndGet(entries.size());
tasksCancelled = true; // so that we do not add to skipped counter again
}
continue;
}
// otherwise rethrow
throw (IOException)cause;
}
// unexpected exception
throw new IOException(cause);
}
}
}
}
static class RetryingRpcCallable<V> implements Callable<V> {
RpcRetryingCallerFactory factory;
RetryingCallable<V> callable;
int timeout;
public RetryingRpcCallable(RpcRetryingCallerFactory factory, RetryingCallable<V> callable,
int timeout) {
this.factory = factory;
this.callable = callable;
this.timeout = timeout;
}
@Override
public V call() throws Exception {
return factory.<V>newCaller().callWithRetries(callable, timeout);
}
}
/**
* Calls replay on the passed edits for the given set of entries belonging to the region. It skips
* the entry if the region boundaries have changed or the region is gone.
*/
static class RegionReplicaReplayCallable
extends RegionAdminServiceCallable<ReplicateWALEntryResponse> {
// replicaId of the region replica that we want to replicate to
private final int replicaId;
private final List<HLog.Entry> entries;
private final byte[] initialEncodedRegionName;
private final AtomicLong skippedEntries;
private final RpcControllerFactory rpcControllerFactory;
private boolean skip;
public RegionReplicaReplayCallable(ClusterConnection connection,
RpcControllerFactory rpcControllerFactory, TableName tableName,
HRegionLocation location, HRegionInfo regionInfo, byte[] row,List<HLog.Entry> entries,
AtomicLong skippedEntries) {
super(connection, location, tableName, row);
this.replicaId = regionInfo.getReplicaId();
this.entries = entries;
this.rpcControllerFactory = rpcControllerFactory;
this.skippedEntries = skippedEntries;
this.initialEncodedRegionName = regionInfo.getEncodedNameAsBytes();
}
@Override
public HRegionLocation getLocation(boolean useCache) throws IOException {
RegionLocations rl = getRegionLocations(connection, tableName, row, useCache, replicaId);
if (rl == null) {
throw new HBaseIOException(getExceptionMessage());
}
location = rl.getRegionLocation(replicaId);
if (location == null) {
throw new HBaseIOException(getExceptionMessage());
}
// check whether we should still replay this entry. If the regions are changed, or the
// entry is not coming form the primary region, filter it out because we do not need it.
// Regions can change because of (1) region split (2) region merge (3) table recreated
if (!Bytes.equals(location.getRegionInfo().getEncodedNameAsBytes(),
initialEncodedRegionName)) {
skip = true;
return null;
}
return location;
}
@Override
public ReplicateWALEntryResponse call(int timeout) throws IOException {
return replayToServer(this.entries, timeout);
}
private ReplicateWALEntryResponse replayToServer(List<HLog.Entry> entries, int timeout)
throws IOException {
if (entries.isEmpty() || skip) {
skippedEntries.incrementAndGet();
return ReplicateWALEntryResponse.newBuilder().build();
}
HLog.Entry[] entriesArray = new HLog.Entry[entries.size()];
entriesArray = entries.toArray(entriesArray);
// set the region name for the target region replica
Pair<AdminProtos.ReplicateWALEntryRequest, CellScanner> p =
ReplicationProtbufUtil.buildReplicateWALEntryRequest(
entriesArray, location.getRegionInfo().getEncodedNameAsBytes());
try {
PayloadCarryingRpcController controller = rpcControllerFactory.newController(p.getSecond());
controller.setCallTimeout(timeout);
controller.setPriority(tableName);
return stub.replay(controller, p.getFirst());
} catch (ServiceException se) {
throw ProtobufUtil.getRemoteException(se);
}
}
@Override
protected String getExceptionMessage() {
return super.getExceptionMessage() + " table=" + tableName
+ " ,replica=" + replicaId + ", row=" + Bytes.toStringBinary(row);
}
}
private static RegionLocations getRegionLocations(
ClusterConnection connection, TableName tableName, byte[] row,
boolean useCache, int replicaId)
throws RetriesExhaustedException, DoNotRetryIOException, InterruptedIOException {
RegionLocations rl;
try {
rl = connection.locateRegion(tableName, row, useCache, true, replicaId);
} catch (DoNotRetryIOException e) {
throw e;
} catch (RetriesExhaustedException e) {
throw e;
} catch (InterruptedIOException e) {
throw e;
} catch (IOException e) {
throw new RetriesExhaustedException("Can't get the location", e);
}
if (rl == null) {
throw new RetriesExhaustedException("Can't get the locations");
}
return rl;
}
}

View File

@ -706,7 +706,8 @@ public class ReplicationSource extends Thread
}
break;
} catch (Exception ex) {
LOG.warn(replicationEndpoint.getClass().getName() + " threw unknown exception:" + ex);
LOG.warn(replicationEndpoint.getClass().getName() + " threw unknown exception:" +
org.apache.hadoop.util.StringUtils.stringifyException(ex));
if (sleepForRetries("ReplicationEndpoint threw exception", sleepMultiplier)) {
sleepMultiplier++;
}

View File

@ -25,16 +25,36 @@ import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.hbase.HRegionInfo;
import org.apache.hadoop.hbase.client.RegionReplicaUtil;
import org.apache.hadoop.hbase.client.replication.ReplicationAdmin;
import org.apache.hadoop.hbase.io.HFileLink;
import org.apache.hadoop.hbase.io.Reference;
import org.apache.hadoop.hbase.regionserver.HRegion;
import org.apache.hadoop.hbase.regionserver.StoreFileInfo;
import org.apache.hadoop.hbase.replication.ReplicationException;
import org.apache.hadoop.hbase.replication.ReplicationPeerConfig;
import org.apache.hadoop.hbase.replication.regionserver.RegionReplicaReplicationEndpoint;
import org.apache.hadoop.hbase.zookeeper.ZKUtil;
/**
* Similar to {@link RegionReplicaUtil} but for the server side
*/
public class ServerRegionReplicaUtil extends RegionReplicaUtil {
/**
* Whether asynchronous WAL replication to the secondary region replicas is enabled or not.
* If this is enabled, a replication peer named "region_replica_replication" will be created
* which will tail the logs and replicate the mutatations to region replicas for tables that
* have region replication > 1. If this is enabled once, disabling this replication also
* requires disabling the replication peer using shell or ReplicationAdmin java class.
* Replication to secondary region replicas works over standard inter-cluster replication.·
* So replication, if disabled explicitly, also has to be enabled by setting "hbase.replication"·
* to true for this feature to work.
*/
public static final String REGION_REPLICA_REPLICATION_CONF_KEY
= "hbase.region.replica.replication.enabled";
private static final boolean DEFAULT_REGION_REPLICA_REPLICATION = false;
private static final String REGION_REPLICA_REPLICATION_PEER = "region_replica_replication";
/**
* Returns the regionInfo object to use for interacting with the file system.
* @return An HRegionInfo object to interact with the filesystem
@ -96,4 +116,35 @@ public class ServerRegionReplicaUtil extends RegionReplicaUtil {
return new StoreFileInfo(conf, fs, status, link);
}
/**
* Create replication peer for replicating to region replicas if needed.
* @param conf configuration to use
* @throws IOException
*/
public static void setupRegionReplicaReplication(Configuration conf) throws IOException {
if (!conf.getBoolean(REGION_REPLICA_REPLICATION_CONF_KEY, DEFAULT_REGION_REPLICA_REPLICATION)) {
return;
}
ReplicationAdmin repAdmin = new ReplicationAdmin(conf);
try {
if (repAdmin.getPeerConfig(REGION_REPLICA_REPLICATION_PEER) == null) {
ReplicationPeerConfig peerConfig = new ReplicationPeerConfig();
peerConfig.setClusterKey(ZKUtil.getZooKeeperClusterKey(conf));
peerConfig.setReplicationEndpointImpl(RegionReplicaReplicationEndpoint.class.getName());
repAdmin.addPeer(REGION_REPLICA_REPLICATION_PEER, peerConfig, null);
}
} catch (ReplicationException ex) {
throw new IOException(ex);
} finally {
repAdmin.close();
}
}
/**
* Return the peer id used for replicating to secondary region replicas
*/
public static String getReplicationPeerId() {
return REGION_REPLICA_REPLICATION_PEER;
}
}

View File

@ -17,6 +17,7 @@
*/
package org.apache.hadoop.hbase;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertTrue;
import static org.junit.Assert.fail;
@ -62,6 +63,7 @@ import org.apache.hadoop.hbase.client.HBaseAdmin;
import org.apache.hadoop.hbase.client.HConnection;
import org.apache.hadoop.hbase.client.HConnectionManager;
import org.apache.hadoop.hbase.client.HTable;
import org.apache.hadoop.hbase.client.HTableInterface;
import org.apache.hadoop.hbase.client.Put;
import org.apache.hadoop.hbase.client.Result;
import org.apache.hadoop.hbase.client.ResultScanner;
@ -1532,6 +1534,18 @@ public class HBaseTestingUtility extends HBaseCommonTestingUtility {
getHBaseAdmin().deleteTable(tableName);
}
/**
* Drop an existing table
* @param tableName existing table
*/
public void deleteTableIfAny(TableName tableName) throws IOException {
try {
deleteTable(tableName);
} catch (TableNotFoundException e) {
// ignore
}
}
// ==========================================================================
// Canned table and table descriptor creation
// TODO replace HBaseTestCase
@ -1846,7 +1860,8 @@ public class HBaseTestingUtility extends HBaseCommonTestingUtility {
return rowCount;
}
public void loadNumericRows(final HTable t, final byte[] f, int startRow, int endRow) throws IOException {
public void loadNumericRows(final HTableInterface t, final byte[] f, int startRow, int endRow)
throws IOException {
for (int i = startRow; i < endRow; i++) {
byte[] data = Bytes.toBytes(String.valueOf(i));
Put put = new Put(data);
@ -1855,7 +1870,23 @@ public class HBaseTestingUtility extends HBaseCommonTestingUtility {
}
}
public void deleteNumericRows(final HTable t, final byte[] f, int startRow, int endRow) throws IOException {
public void verifyNumericRows(HRegion region, final byte[] f, int startRow, int endRow)
throws IOException {
for (int i = startRow; i < endRow; i++) {
String failMsg = "Failed verification of row :" + i;
byte[] data = Bytes.toBytes(String.valueOf(i));
Result result = region.get(new Get(data));
assertTrue(failMsg, result.containsColumn(f, null));
assertEquals(failMsg, result.getColumnCells(f, null).size(), 1);
Cell cell = result.getColumnLatestCell(f, null);
assertTrue(failMsg,
Bytes.equals(data, 0, data.length, cell.getValueArray(), cell.getValueOffset(),
cell.getValueLength()));
}
}
public void deleteNumericRows(final HTable t, final byte[] f, int startRow, int endRow)
throws IOException {
for (int i = startRow; i < endRow; i++) {
byte[] data = Bytes.toBytes(String.valueOf(i));
Delete delete = new Delete(data);

View File

@ -18,6 +18,7 @@
package org.apache.hadoop.hbase.regionserver;
import static org.apache.hadoop.hbase.regionserver.TestRegionServerNoMaster.*;
import java.io.IOException;
import java.util.Random;
import java.util.concurrent.ExecutorService;
@ -32,7 +33,6 @@ import org.apache.hadoop.hbase.HBaseTestingUtility;
import org.apache.hadoop.hbase.HConstants;
import org.apache.hadoop.hbase.HRegionInfo;
import org.apache.hadoop.hbase.MediumTests;
import org.apache.hadoop.hbase.NotServingRegionException;
import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.TestMetaTableAccessor;
import org.apache.hadoop.hbase.client.Consistency;
@ -42,7 +42,6 @@ import org.apache.hadoop.hbase.client.Put;
import org.apache.hadoop.hbase.client.Result;
import org.apache.hadoop.hbase.protobuf.ProtobufUtil;
import org.apache.hadoop.hbase.protobuf.RequestConverter;
import org.apache.hadoop.hbase.protobuf.generated.AdminProtos;
import org.apache.hadoop.hbase.protobuf.generated.ClientProtos;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.hbase.util.Threads;
@ -102,53 +101,9 @@ public class TestRegionReplicas {
return HTU.getMiniHBaseCluster().getRegionServer(0);
}
private void openRegion(HRegionInfo hri) throws Exception {
// first version is '0'
AdminProtos.OpenRegionRequest orr = RequestConverter.buildOpenRegionRequest(getRS().getServerName(), hri, null, null);
AdminProtos.OpenRegionResponse responseOpen = getRS().getRSRpcServices().openRegion(null, orr);
Assert.assertTrue(responseOpen.getOpeningStateCount() == 1);
Assert.assertTrue(responseOpen.getOpeningState(0).
equals(AdminProtos.OpenRegionResponse.RegionOpeningState.OPENED));
checkRegionIsOpened(hri.getEncodedName());
}
private void closeRegion(HRegionInfo hri) throws Exception {
AdminProtos.CloseRegionRequest crr = RequestConverter.buildCloseRegionRequest(getRS().getServerName(),
hri.getEncodedName());
AdminProtos.CloseRegionResponse responseClose = getRS().getRSRpcServices().closeRegion(null, crr);
Assert.assertTrue(responseClose.getClosed());
checkRegionIsClosed(hri.getEncodedName());
}
private void checkRegionIsOpened(String encodedRegionName) throws Exception {
while (!getRS().getRegionsInTransitionInRS().isEmpty()) {
Thread.sleep(1);
}
Assert.assertTrue(getRS().getRegionByEncodedName(encodedRegionName).isAvailable());
}
private void checkRegionIsClosed(String encodedRegionName) throws Exception {
while (!getRS().getRegionsInTransitionInRS().isEmpty()) {
Thread.sleep(1);
}
try {
Assert.assertFalse(getRS().getRegionByEncodedName(encodedRegionName).isAvailable());
} catch (NotServingRegionException expected) {
// That's how it work: if the region is closed we have an exception.
}
// We don't delete the znode here, because there is not always a znode.
}
@Test(timeout = 60000)
public void testOpenRegionReplica() throws Exception {
openRegion(hriSecondary);
openRegion(HTU, getRS(), hriSecondary);
try {
//load some data to primary
HTU.loadNumericRows(table, f, 0, 1000);
@ -157,14 +112,14 @@ public class TestRegionReplicas {
Assert.assertEquals(1000, HTU.countRows(table));
} finally {
HTU.deleteNumericRows(table, f, 0, 1000);
closeRegion(hriSecondary);
closeRegion(HTU, getRS(), hriSecondary);
}
}
/** Tests that the meta location is saved for secondary regions */
@Test(timeout = 60000)
public void testRegionReplicaUpdatesMetaLocation() throws Exception {
openRegion(hriSecondary);
openRegion(HTU, getRS(), hriSecondary);
HTable meta = null;
try {
meta = new HTable(HTU.getConfiguration(), TableName.META_TABLE_NAME);
@ -172,7 +127,7 @@ public class TestRegionReplicas {
, getRS().getServerName(), -1, 1, false);
} finally {
if (meta != null ) meta.close();
closeRegion(hriSecondary);
closeRegion(HTU, getRS(), hriSecondary);
}
}
@ -186,7 +141,7 @@ public class TestRegionReplicas {
// flush so that region replica can read
getRS().getRegionByEncodedName(hriPrimary.getEncodedName()).flushcache();
openRegion(hriSecondary);
openRegion(HTU, getRS(), hriSecondary);
// first try directly against region
HRegion region = getRS().getFromOnlineRegions(hriSecondary.getEncodedName());
@ -195,7 +150,7 @@ public class TestRegionReplicas {
assertGetRpc(hriSecondary, 42, true);
} finally {
HTU.deleteNumericRows(table, HConstants.CATALOG_FAMILY, 0, 1000);
closeRegion(hriSecondary);
closeRegion(HTU, getRS(), hriSecondary);
}
}
@ -209,7 +164,7 @@ public class TestRegionReplicas {
// flush so that region replica can read
getRS().getRegionByEncodedName(hriPrimary.getEncodedName()).flushcache();
openRegion(hriSecondary);
openRegion(HTU, getRS(), hriSecondary);
// try directly Get against region replica
byte[] row = Bytes.toBytes(String.valueOf(42));
@ -220,7 +175,7 @@ public class TestRegionReplicas {
Assert.assertArrayEquals(row, result.getValue(f, null));
} finally {
HTU.deleteNumericRows(table, HConstants.CATALOG_FAMILY, 0, 1000);
closeRegion(hriSecondary);
closeRegion(HTU, getRS(), hriSecondary);
}
}
@ -236,7 +191,8 @@ public class TestRegionReplicas {
}
// build a mock rpc
private void assertGetRpc(HRegionInfo info, int value, boolean expect) throws IOException, ServiceException {
private void assertGetRpc(HRegionInfo info, int value, boolean expect)
throws IOException, ServiceException {
byte[] row = Bytes.toBytes(String.valueOf(value));
Get get = new Get(row);
ClientProtos.GetRequest getReq = RequestConverter.buildGetRequest(info.getRegionName(), get);
@ -259,13 +215,14 @@ public class TestRegionReplicas {
// enable store file refreshing
final int refreshPeriod = 2000; // 2 sec
HTU.getConfiguration().setInt("hbase.hstore.compactionThreshold", 100);
HTU.getConfiguration().setInt(StorefileRefresherChore.REGIONSERVER_STOREFILE_REFRESH_PERIOD, refreshPeriod);
HTU.getConfiguration().setInt(StorefileRefresherChore.REGIONSERVER_STOREFILE_REFRESH_PERIOD,
refreshPeriod);
// restart the region server so that it starts the refresher chore
restartRegionServer();
try {
LOG.info("Opening the secondary region " + hriSecondary.getEncodedName());
openRegion(hriSecondary);
openRegion(HTU, getRS(), hriSecondary);
//load some data to primary
LOG.info("Loading data to primary region");
@ -321,7 +278,7 @@ public class TestRegionReplicas {
} finally {
HTU.deleteNumericRows(table, HConstants.CATALOG_FAMILY, 0, 1000);
closeRegion(hriSecondary);
closeRegion(HTU, getRS(), hriSecondary);
}
}
@ -338,7 +295,7 @@ public class TestRegionReplicas {
final int startKey = 0, endKey = 1000;
try {
openRegion(hriSecondary);
openRegion(HTU, getRS(), hriSecondary);
//load some data to primary so that reader won't fail
HTU.loadNumericRows(table, f, startKey, endKey);
@ -402,13 +359,13 @@ public class TestRegionReplicas {
// whether to do a close and open
if (random.nextInt(10) == 0) {
try {
closeRegion(hriSecondary);
closeRegion(HTU, getRS(), hriSecondary);
} catch (Exception ex) {
LOG.warn("Failed closing the region " + hriSecondary + " " + StringUtils.stringifyException(ex));
exceptions[2].compareAndSet(null, ex);
}
try {
openRegion(hriSecondary);
openRegion(HTU, getRS(), hriSecondary);
} catch (Exception ex) {
LOG.warn("Failed opening the region " + hriSecondary + " " + StringUtils.stringifyException(ex));
exceptions[2].compareAndSet(null, ex);
@ -443,7 +400,7 @@ public class TestRegionReplicas {
} finally {
HTU.deleteNumericRows(table, HConstants.CATALOG_FAMILY, startKey, endKey);
closeRegion(hriSecondary);
closeRegion(HTU, getRS(), hriSecondary);
}
}
}

View File

@ -147,49 +147,55 @@ public class TestRegionServerNoMaster {
}
/**
* Reopen the region. Reused in multiple tests as we always leave the region open after a test.
*/
private void reopenRegion() throws Exception {
public static void openRegion(HBaseTestingUtility HTU, HRegionServer rs, HRegionInfo hri)
throws Exception {
AdminProtos.OpenRegionRequest orr =
RequestConverter.buildOpenRegionRequest(getRS().getServerName(), hri, null, null);
AdminProtos.OpenRegionResponse responseOpen = getRS().rpcServices.openRegion(null, orr);
RequestConverter.buildOpenRegionRequest(rs.getServerName(), hri, null, null);
AdminProtos.OpenRegionResponse responseOpen = rs.rpcServices.openRegion(null, orr);
Assert.assertTrue(responseOpen.getOpeningStateCount() == 1);
Assert.assertTrue(responseOpen.getOpeningState(0).
equals(AdminProtos.OpenRegionResponse.RegionOpeningState.OPENED));
checkRegionIsOpened();
checkRegionIsOpened(HTU, rs, hri);
}
private void checkRegionIsOpened() throws Exception {
while (!getRS().getRegionsInTransitionInRS().isEmpty()) {
public static void checkRegionIsOpened(HBaseTestingUtility HTU, HRegionServer rs,
HRegionInfo hri) throws Exception {
while (!rs.getRegionsInTransitionInRS().isEmpty()) {
Thread.sleep(1);
}
Assert.assertTrue(getRS().getRegion(regionName).isAvailable());
Assert.assertTrue(rs.getRegion(hri.getRegionName()).isAvailable());
}
public static void closeRegion(HBaseTestingUtility HTU, HRegionServer rs, HRegionInfo hri)
throws Exception {
AdminProtos.CloseRegionRequest crr = RequestConverter.buildCloseRegionRequest(
rs.getServerName(), hri.getEncodedName());
AdminProtos.CloseRegionResponse responseClose = rs.rpcServices.closeRegion(null, crr);
Assert.assertTrue(responseClose.getClosed());
checkRegionIsClosed(HTU, rs, hri);
}
private void checkRegionIsClosed() throws Exception {
while (!getRS().getRegionsInTransitionInRS().isEmpty()) {
public static void checkRegionIsClosed(HBaseTestingUtility HTU, HRegionServer rs,
HRegionInfo hri) throws Exception {
while (!rs.getRegionsInTransitionInRS().isEmpty()) {
Thread.sleep(1);
}
try {
Assert.assertFalse(getRS().getRegion(regionName).isAvailable());
Assert.assertFalse(rs.getRegion(hri.getRegionName()).isAvailable());
} catch (NotServingRegionException expected) {
// That's how it work: if the region is closed we have an exception.
}
}
/**
* Close the region without using ZK
*/
private void closeNoZK() throws Exception {
private void closeRegionNoZK() throws Exception {
// no transition in ZK
AdminProtos.CloseRegionRequest crr =
RequestConverter.buildCloseRegionRequest(getRS().getServerName(), regionName);
@ -197,14 +203,14 @@ public class TestRegionServerNoMaster {
Assert.assertTrue(responseClose.getClosed());
// now waiting & checking. After a while, the transition should be done and the region closed
checkRegionIsClosed();
checkRegionIsClosed(HTU, getRS(), hri);
}
@Test(timeout = 60000)
public void testCloseByRegionServer() throws Exception {
closeNoZK();
reopenRegion();
closeRegionNoZK();
openRegion(HTU, getRS(), hri);
}
/**
@ -221,8 +227,8 @@ public class TestRegionServerNoMaster {
public void testMultipleOpen() throws Exception {
// We close
closeNoZK();
checkRegionIsClosed();
closeRegionNoZK();
checkRegionIsClosed(HTU, getRS(), hri);
// We're sending multiple requests in a row. The region server must handle this nicely.
for (int i = 0; i < 10; i++) {
@ -238,7 +244,7 @@ public class TestRegionServerNoMaster {
);
}
checkRegionIsOpened();
checkRegionIsOpened(HTU, getRS(), hri);
}
@Test
@ -279,9 +285,9 @@ public class TestRegionServerNoMaster {
}
}
checkRegionIsClosed();
checkRegionIsClosed(HTU, getRS(), hri);
reopenRegion();
openRegion(HTU, getRS(), hri);
}
/**
@ -290,8 +296,8 @@ public class TestRegionServerNoMaster {
@Test(timeout = 60000)
public void testCancelOpeningWithoutZK() throws Exception {
// We close
closeNoZK();
checkRegionIsClosed();
closeRegionNoZK();
checkRegionIsClosed(HTU, getRS(), hri);
// Let do the initial steps, without having a handler
getRS().getRegionsInTransitionInRS().put(hri.getEncodedNameAsBytes(), Boolean.TRUE);
@ -315,9 +321,9 @@ public class TestRegionServerNoMaster {
getRS().service.submit(new OpenRegionHandler(getRS(), getRS(), hri, htd));
// The open handler should have removed the region from RIT but kept the region closed
checkRegionIsClosed();
checkRegionIsClosed(HTU, getRS(), hri);
reopenRegion();
openRegion(HTU, getRS(), hri);
}
/**
@ -341,7 +347,7 @@ public class TestRegionServerNoMaster {
}
//actual close
closeNoZK();
closeRegionNoZK();
try {
AdminProtos.OpenRegionRequest orr = RequestConverter.buildOpenRegionRequest(
earlierServerName, hri, null, null);
@ -351,7 +357,7 @@ public class TestRegionServerNoMaster {
Assert.assertTrue(se.getCause() instanceof IOException);
Assert.assertTrue(se.getCause().getMessage().contains("This RPC was intended for a different server"));
} finally {
reopenRegion();
openRegion(HTU, getRS(), hri);
}
}
}

View File

@ -30,13 +30,12 @@ import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hbase.*;
import org.apache.hadoop.hbase.protobuf.generated.ZooKeeperProtos.SplitLogTask.RecoveryMode;
import org.apache.hadoop.hbase.regionserver.wal.HLogSplitter.EntryBuffers;
import org.apache.hadoop.hbase.regionserver.wal.HLogSplitter.PipelineController;
import org.apache.hadoop.hbase.regionserver.wal.HLogSplitter.RegionEntryBuffer;
import org.apache.hadoop.hbase.util.Bytes;
import org.junit.Test;
import org.junit.experimental.categories.Category;
import static org.mockito.Mockito.mock;
/**
* Simple testing of a few HLog methods.
*/
@ -45,7 +44,7 @@ public class TestHLogMethods {
private static final byte[] TEST_REGION = Bytes.toBytes("test_region");;
private static final TableName TEST_TABLE =
TableName.valueOf("test_table");
private final HBaseTestingUtility util = new HBaseTestingUtility();
/**
@ -108,27 +107,25 @@ public class TestHLogMethods {
reb.appendEntry(createTestLogEntry(1));
assertTrue(reb.heapSize() > 0);
}
@Test
public void testEntrySink() throws Exception {
Configuration conf = new Configuration();
RecoveryMode mode = (conf.getBoolean(HConstants.DISTRIBUTED_LOG_REPLAY_KEY, false) ?
RecoveryMode mode = (conf.getBoolean(HConstants.DISTRIBUTED_LOG_REPLAY_KEY, false) ?
RecoveryMode.LOG_REPLAY : RecoveryMode.LOG_SPLITTING);
HLogSplitter splitter = new HLogSplitter(
conf, mock(Path.class), mock(FileSystem.class), null, null, null, mode);
EntryBuffers sink = splitter.new EntryBuffers(1*1024*1024);
EntryBuffers sink = new EntryBuffers(new PipelineController(), 1*1024*1024);
for (int i = 0; i < 1000; i++) {
HLog.Entry entry = createTestLogEntry(i);
sink.appendEntry(entry);
}
assertTrue(sink.totalBuffered > 0);
long amountInChunk = sink.totalBuffered;
// Get a chunk
RegionEntryBuffer chunk = sink.getChunkToWrite();
assertEquals(chunk.heapSize(), amountInChunk);
// Make sure it got marked that a thread is "working on this"
assertTrue(sink.isRegionCurrentlyWriting(TEST_REGION));
@ -136,26 +133,26 @@ public class TestHLogMethods {
for (int i = 0; i < 500; i++) {
HLog.Entry entry = createTestLogEntry(i);
sink.appendEntry(entry);
}
}
// Asking for another chunk shouldn't work since the first one
// is still writing
assertNull(sink.getChunkToWrite());
// If we say we're done writing the first chunk, then we should be able
// to get the second
sink.doneWriting(chunk);
RegionEntryBuffer chunk2 = sink.getChunkToWrite();
assertNotNull(chunk2);
assertNotSame(chunk, chunk2);
long amountInChunk2 = sink.totalBuffered;
// The second chunk had fewer rows than the first
assertTrue(amountInChunk2 < amountInChunk);
sink.doneWriting(chunk2);
assertEquals(0, sink.totalBuffered);
}
private HLog.Entry createTestLogEntry(int i) {
long seq = i;
long now = i * 1000;

View File

@ -0,0 +1,345 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hbase.replication.regionserver;
import static org.junit.Assert.*;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.when;
import java.io.IOException;
import java.util.List;
import java.util.concurrent.Executors;
import java.util.concurrent.atomic.AtomicLong;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.commons.logging.impl.Log4JLogger;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.HBaseTestingUtility;
import org.apache.hadoop.hbase.HConstants;
import org.apache.hadoop.hbase.HRegionLocation;
import org.apache.hadoop.hbase.HTableDescriptor;
import org.apache.hadoop.hbase.MediumTests;
import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.Waiter;
import org.apache.hadoop.hbase.client.ClusterConnection;
import org.apache.hadoop.hbase.client.HConnection;
import org.apache.hadoop.hbase.client.HConnectionManager;
import org.apache.hadoop.hbase.client.HTableInterface;
import org.apache.hadoop.hbase.client.RpcRetryingCaller;
import org.apache.hadoop.hbase.client.replication.ReplicationAdmin;
import org.apache.hadoop.hbase.regionserver.HRegion;
import org.apache.hadoop.hbase.regionserver.HRegionServer;
import org.apache.hadoop.hbase.regionserver.wal.HLog;
import org.apache.hadoop.hbase.regionserver.wal.HLogKey;
import org.apache.hadoop.hbase.regionserver.wal.WALEdit;
import org.apache.hadoop.hbase.replication.ReplicationException;
import org.apache.hadoop.hbase.replication.ReplicationPeerConfig;
import org.apache.hadoop.hbase.util.ServerRegionReplicaUtil;
import org.apache.hadoop.hbase.zookeeper.ZKUtil;
import org.apache.log4j.Level;
import org.junit.AfterClass;
import org.junit.BeforeClass;
import org.junit.Test;
import org.junit.experimental.categories.Category;
import com.google.common.collect.Lists;
/**
* Tests RegionReplicaReplicationEndpoint class by setting up region replicas and verifying
* async wal replication replays the edits to the secondary region in various scenarios.
*/
@Category(MediumTests.class)
public class TestRegionReplicaReplicationEndpoint {
private static final Log LOG = LogFactory.getLog(TestRegionReplicaReplicationEndpoint.class);
static {
((Log4JLogger)RpcRetryingCaller.LOG).getLogger().setLevel(Level.ALL);
}
private static final int NB_SERVERS = 2;
private static final HBaseTestingUtility HTU = new HBaseTestingUtility();
@BeforeClass
public static void beforeClass() throws Exception {
Configuration conf = HTU.getConfiguration();
conf.setFloat("hbase.regionserver.logroll.multiplier", 0.0003f);
conf.setInt("replication.source.size.capacity", 10240);
conf.setLong("replication.source.sleepforretries", 100);
conf.setInt("hbase.regionserver.maxlogs", 10);
conf.setLong("hbase.master.logcleaner.ttl", 10);
conf.setInt("zookeeper.recovery.retry", 1);
conf.setInt("zookeeper.recovery.retry.intervalmill", 10);
conf.setBoolean(HConstants.REPLICATION_ENABLE_KEY, true);
conf.setBoolean(ServerRegionReplicaUtil.REGION_REPLICA_REPLICATION_CONF_KEY, true);
conf.setLong(HConstants.THREAD_WAKE_FREQUENCY, 100);
conf.setInt("replication.stats.thread.period.seconds", 5);
conf.setBoolean("hbase.tests.use.shortcircuit.reads", false);
conf.setInt(HConstants.HBASE_CLIENT_RETRIES_NUMBER, 3); // less number of retries is needed
conf.setInt("hbase.client.serverside.retries.multiplier", 1);
HTU.startMiniCluster(NB_SERVERS);
}
@AfterClass
public static void afterClass() throws Exception {
HTU.shutdownMiniCluster();
}
@Test
public void testRegionReplicaReplicationPeerIsCreated() throws IOException, ReplicationException {
// create a table with region replicas. Check whether the replication peer is created
// and replication started.
ReplicationAdmin admin = new ReplicationAdmin(HTU.getConfiguration());
String peerId = "region_replica_replication";
if (admin.getPeerConfig(peerId) != null) {
admin.removePeer(peerId);
}
HTableDescriptor htd = HTU.createTableDescriptor(
"testReplicationPeerIsCreated_no_region_replicas");
HTU.getHBaseAdmin().createTable(htd);
ReplicationPeerConfig peerConfig = admin.getPeerConfig(peerId);
assertNull(peerConfig);
htd = HTU.createTableDescriptor("testReplicationPeerIsCreated");
htd.setRegionReplication(2);
HTU.getHBaseAdmin().createTable(htd);
// assert peer configuration is correct
peerConfig = admin.getPeerConfig(peerId);
assertNotNull(peerConfig);
assertEquals(peerConfig.getClusterKey(), ZKUtil.getZooKeeperClusterKey(HTU.getConfiguration()));
assertEquals(peerConfig.getReplicationEndpointImpl(),
RegionReplicaReplicationEndpoint.class.getName());
admin.close();
}
public void testRegionReplicaReplication(int regionReplication) throws Exception {
// test region replica replication. Create a table with single region, write some data
// ensure that data is replicated to the secondary region
TableName tableName = TableName.valueOf("testRegionReplicaReplicationWithReplicas_"
+ regionReplication);
HTableDescriptor htd = HTU.createTableDescriptor(tableName.toString());
htd.setRegionReplication(regionReplication);
HTU.getHBaseAdmin().createTable(htd);
TableName tableNameNoReplicas =
TableName.valueOf("testRegionReplicaReplicationWithReplicas_NO_REPLICAS");
HTU.deleteTableIfAny(tableNameNoReplicas);
HTU.createTable(tableNameNoReplicas, HBaseTestingUtility.fam1);
HConnection connection = HConnectionManager.createConnection(HTU.getConfiguration());
HTableInterface table = connection.getTable(tableName);
HTableInterface tableNoReplicas = connection.getTable(tableNameNoReplicas);
try {
// load some data to the non-replicated table
HTU.loadNumericRows(tableNoReplicas, HBaseTestingUtility.fam1, 6000, 7000);
// load the data to the table
HTU.loadNumericRows(table, HBaseTestingUtility.fam1, 0, 1000);
verifyReplication(tableName, regionReplication, 0, 1000);
} finally {
table.close();
tableNoReplicas.close();
HTU.deleteTableIfAny(tableNameNoReplicas);
connection.close();
}
}
private void verifyReplication(TableName tableName, int regionReplication,
final int startRow, final int endRow) throws Exception {
// find the regions
final HRegion[] regions = new HRegion[regionReplication];
for (int i=0; i < NB_SERVERS; i++) {
HRegionServer rs = HTU.getMiniHBaseCluster().getRegionServer(i);
List<HRegion> onlineRegions = rs.getOnlineRegions(tableName);
for (HRegion region : onlineRegions) {
regions[region.getRegionInfo().getReplicaId()] = region;
}
}
for (HRegion region : regions) {
assertNotNull(region);
}
for (int i = 1; i < regionReplication; i++) {
final HRegion region = regions[i];
// wait until all the data is replicated to all secondary regions
Waiter.waitFor(HTU.getConfiguration(), 60000, new Waiter.Predicate<Exception>() {
@Override
public boolean evaluate() throws Exception {
LOG.info("verifying replication for region replica:" + region.getRegionInfo());
try {
HTU.verifyNumericRows(region, HBaseTestingUtility.fam1, startRow, endRow);
} catch(Throwable ex) {
LOG.warn("Verification from secondary region is not complete yet. Got:" + ex
+ " " + ex.getMessage());
// still wait
return false;
}
return true;
}
});
}
}
@Test(timeout = 60000)
public void testRegionReplicaReplicationWith2Replicas() throws Exception {
testRegionReplicaReplication(2);
}
@Test(timeout = 60000)
public void testRegionReplicaReplicationWith3Replicas() throws Exception {
testRegionReplicaReplication(3);
}
@Test(timeout = 60000)
public void testRegionReplicaReplicationWith10Replicas() throws Exception {
testRegionReplicaReplication(10);
}
@Test (timeout = 60000)
public void testRegionReplicaReplicationForFlushAndCompaction() throws Exception {
// Tests a table with region replication 3. Writes some data, and causes flushes and
// compactions. Verifies that the data is readable from the replicas. Note that this
// does not test whether the replicas actually pick up flushed files and apply compaction
// to their stores
int regionReplication = 3;
TableName tableName = TableName.valueOf("testRegionReplicaReplicationForFlushAndCompaction");
HTableDescriptor htd = HTU.createTableDescriptor(tableName.toString());
htd.setRegionReplication(regionReplication);
HTU.getHBaseAdmin().createTable(htd);
HConnection connection = HConnectionManager.createConnection(HTU.getConfiguration());
HTableInterface table = connection.getTable(tableName);
try {
// load the data to the table
for (int i = 0; i < 6000; i += 1000) {
LOG.info("Writing data from " + i + " to " + (i+1000));
HTU.loadNumericRows(table, HBaseTestingUtility.fam1, i, i+1000);
LOG.info("flushing table");
HTU.flush(tableName);
LOG.info("compacting table");
HTU.compact(tableName, false);
}
verifyReplication(tableName, regionReplication, 0, 6000);
} finally {
table.close();
connection.close();
}
}
@Test (timeout = 60000)
public void testRegionReplicaReplicationIgnoresDisabledTables() throws Exception {
testRegionReplicaReplicationIgnoresDisabledTables(false);
}
@Test (timeout = 60000)
public void testRegionReplicaReplicationIgnoresDroppedTables() throws Exception {
testRegionReplicaReplicationIgnoresDisabledTables(true);
}
public void testRegionReplicaReplicationIgnoresDisabledTables(boolean dropTable)
throws Exception {
// tests having edits from a disabled or dropped table is handled correctly by skipping those
// entries and further edits after the edits from dropped/disabled table can be replicated
// without problems.
TableName tableName = TableName.valueOf("testRegionReplicaReplicationIgnoresDisabledTables"
+ dropTable);
HTableDescriptor htd = HTU.createTableDescriptor(tableName.toString());
int regionReplication = 3;
htd.setRegionReplication(regionReplication);
HTU.deleteTableIfAny(tableName);
HTU.getHBaseAdmin().createTable(htd);
TableName toBeDisabledTable = TableName.valueOf(dropTable ? "droppedTable" : "disabledTable");
HTU.deleteTableIfAny(toBeDisabledTable);
htd = HTU.createTableDescriptor(toBeDisabledTable.toString());
htd.setRegionReplication(regionReplication);
HTU.getHBaseAdmin().createTable(htd);
// both tables are created, now pause replication
ReplicationAdmin admin = new ReplicationAdmin(HTU.getConfiguration());
admin.disablePeer(ServerRegionReplicaUtil.getReplicationPeerId());
// now that the replication is disabled, write to the table to be dropped, then drop the table.
HConnection connection = HConnectionManager.createConnection(HTU.getConfiguration());
HTableInterface table = connection.getTable(tableName);
HTableInterface tableToBeDisabled = connection.getTable(toBeDisabledTable);
HTU.loadNumericRows(tableToBeDisabled, HBaseTestingUtility.fam1, 6000, 7000);
AtomicLong skippedEdits = new AtomicLong();
RegionReplicaReplicationEndpoint.RegionReplicaOutputSink sink =
mock(RegionReplicaReplicationEndpoint.RegionReplicaOutputSink.class);
when(sink.getSkippedEditsCounter()).thenReturn(skippedEdits);
RegionReplicaReplicationEndpoint.RegionReplicaSinkWriter sinkWriter =
new RegionReplicaReplicationEndpoint.RegionReplicaSinkWriter(sink,
(ClusterConnection) connection,
Executors.newSingleThreadExecutor(), 1000);
HRegionLocation hrl = connection.locateRegion(toBeDisabledTable, HConstants.EMPTY_BYTE_ARRAY);
byte[] encodedRegionName = hrl.getRegionInfo().getEncodedNameAsBytes();
HLog.Entry entry = new HLog.Entry(
new HLogKey(encodedRegionName, toBeDisabledTable, 1),
new WALEdit());
HTU.getHBaseAdmin().disableTable(toBeDisabledTable); // disable the table
if (dropTable) {
HTU.getHBaseAdmin().deleteTable(toBeDisabledTable);
}
sinkWriter.append(toBeDisabledTable, encodedRegionName,
HConstants.EMPTY_BYTE_ARRAY, Lists.newArrayList(entry, entry));
assertEquals(2, skippedEdits.get());
try {
// load some data to the to-be-dropped table
// load the data to the table
HTU.loadNumericRows(table, HBaseTestingUtility.fam1, 0, 1000);
// now enable the replication
admin.enablePeer(ServerRegionReplicaUtil.getReplicationPeerId());
verifyReplication(tableName, regionReplication, 0, 1000);
} finally {
admin.close();
table.close();
tableToBeDisabled.close();
HTU.deleteTableIfAny(toBeDisabledTable);
connection.close();
}
}
}

View File

@ -0,0 +1,264 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hbase.replication.regionserver;
import static org.apache.hadoop.hbase.regionserver.TestRegionServerNoMaster.closeRegion;
import static org.apache.hadoop.hbase.regionserver.TestRegionServerNoMaster.openRegion;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.when;
import java.io.IOException;
import java.util.Queue;
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.atomic.AtomicLong;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.HBaseTestingUtility;
import org.apache.hadoop.hbase.HConstants;
import org.apache.hadoop.hbase.HRegionInfo;
import org.apache.hadoop.hbase.HTableDescriptor;
import org.apache.hadoop.hbase.MediumTests;
import org.apache.hadoop.hbase.RegionLocations;
import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.client.ClusterConnection;
import org.apache.hadoop.hbase.client.HConnectionManager;
import org.apache.hadoop.hbase.client.HTable;
import org.apache.hadoop.hbase.client.RpcRetryingCallerFactory;
import org.apache.hadoop.hbase.coprocessor.BaseWALObserver;
import org.apache.hadoop.hbase.coprocessor.CoprocessorHost;
import org.apache.hadoop.hbase.coprocessor.ObserverContext;
import org.apache.hadoop.hbase.coprocessor.WALCoprocessorEnvironment;
import org.apache.hadoop.hbase.ipc.RpcControllerFactory;
import org.apache.hadoop.hbase.protobuf.generated.AdminProtos.ReplicateWALEntryResponse;
import org.apache.hadoop.hbase.regionserver.HRegion;
import org.apache.hadoop.hbase.regionserver.HRegionServer;
import org.apache.hadoop.hbase.regionserver.TestRegionServerNoMaster;
import org.apache.hadoop.hbase.regionserver.wal.HLog;
import org.apache.hadoop.hbase.regionserver.wal.HLogKey;
import org.apache.hadoop.hbase.regionserver.wal.WALEdit;
import org.apache.hadoop.hbase.replication.ReplicationEndpoint;
import org.apache.hadoop.hbase.replication.ReplicationEndpoint.ReplicateContext;
import org.apache.hadoop.hbase.replication.regionserver.RegionReplicaReplicationEndpoint.RegionReplicaReplayCallable;
import org.apache.hadoop.hbase.util.ServerRegionReplicaUtil;
import org.junit.After;
import org.junit.AfterClass;
import org.junit.Assert;
import org.junit.Before;
import org.junit.BeforeClass;
import org.junit.Test;
import org.junit.experimental.categories.Category;
import com.google.common.collect.Lists;
/**
* Tests RegionReplicaReplicationEndpoint. Unlike TestRegionReplicaReplicationEndpoint this
* class contains lower level tests using callables.
*/
@Category(MediumTests.class)
public class TestRegionReplicaReplicationEndpointNoMaster {
private static final Log LOG = LogFactory.getLog(
TestRegionReplicaReplicationEndpointNoMaster.class);
private static final int NB_SERVERS = 2;
private static TableName tableName = TableName.valueOf(
TestRegionReplicaReplicationEndpointNoMaster.class.getSimpleName());
private static HTable table;
private static final byte[] row = "TestRegionReplicaReplicator".getBytes();
private static HRegionServer rs0;
private static HRegionServer rs1;
private static HRegionInfo hriPrimary;
private static HRegionInfo hriSecondary;
private static final HBaseTestingUtility HTU = new HBaseTestingUtility();
private static final byte[] f = HConstants.CATALOG_FAMILY;
@BeforeClass
public static void beforeClass() throws Exception {
Configuration conf = HTU.getConfiguration();
conf.setBoolean(HConstants.REPLICATION_ENABLE_KEY, true);
conf.setBoolean(ServerRegionReplicaUtil.REGION_REPLICA_REPLICATION_CONF_KEY, true);
// install WALObserver coprocessor for tests
String walCoprocs = HTU.getConfiguration().get(CoprocessorHost.WAL_COPROCESSOR_CONF_KEY);
if (walCoprocs == null) {
walCoprocs = WALEditCopro.class.getName();
} else {
walCoprocs += "," + WALEditCopro.class.getName();
}
HTU.getConfiguration().set(CoprocessorHost.WAL_COPROCESSOR_CONF_KEY,
walCoprocs);
HTU.startMiniCluster(NB_SERVERS);
// Create table then get the single region for our new table.
HTableDescriptor htd = HTU.createTableDescriptor(tableName.toString());
table = HTU.createTable(htd, new byte[][]{f}, HTU.getConfiguration());
hriPrimary = table.getRegionLocation(row, false).getRegionInfo();
// mock a secondary region info to open
hriSecondary = new HRegionInfo(hriPrimary.getTable(), hriPrimary.getStartKey(),
hriPrimary.getEndKey(), hriPrimary.isSplit(), hriPrimary.getRegionId(), 1);
// No master
TestRegionServerNoMaster.stopMasterAndAssignMeta(HTU);
rs0 = HTU.getMiniHBaseCluster().getRegionServer(0);
rs1 = HTU.getMiniHBaseCluster().getRegionServer(1);
}
@AfterClass
public static void afterClass() throws Exception {
table.close();
HTU.shutdownMiniCluster();
}
@Before
public void before() throws Exception{
entries.clear();
}
@After
public void after() throws Exception {
}
static ConcurrentLinkedQueue<HLog.Entry> entries = new ConcurrentLinkedQueue<HLog.Entry>();
public static class WALEditCopro extends BaseWALObserver {
public WALEditCopro() {
entries.clear();
}
@Override
public void postWALWrite(ObserverContext<WALCoprocessorEnvironment> ctx, HRegionInfo info,
HLogKey logKey, WALEdit logEdit) throws IOException {
// only keep primary region's edits
if (logKey.getTablename().equals(tableName) && info.getReplicaId() == 0) {
entries.add(new HLog.Entry(logKey, logEdit));
}
}
}
@Test
public void testReplayCallable() throws Exception {
// tests replaying the edits to a secondary region replica using the Callable directly
openRegion(HTU, rs0, hriSecondary);
ClusterConnection connection =
(ClusterConnection) HConnectionManager.createConnection(HTU.getConfiguration());
//load some data to primary
HTU.loadNumericRows(table, f, 0, 1000);
Assert.assertEquals(1000, entries.size());
// replay the edits to the secondary using replay callable
replicateUsingCallable(connection, entries);
HRegion region = rs0.getFromOnlineRegions(hriSecondary.getEncodedName());
HTU.verifyNumericRows(region, f, 0, 1000);
HTU.deleteNumericRows(table, f, 0, 1000);
closeRegion(HTU, rs0, hriSecondary);
connection.close();
}
private void replicateUsingCallable(ClusterConnection connection, Queue<HLog.Entry> entries)
throws IOException, RuntimeException {
HLog.Entry entry;
while ((entry = entries.poll()) != null) {
byte[] row = entry.getEdit().getKeyValues().get(0).getRow();
RegionLocations locations = connection.locateRegion(tableName, row, true, true);
RegionReplicaReplayCallable callable = new RegionReplicaReplayCallable(connection,
RpcControllerFactory.instantiate(connection.getConfiguration()),
table.getName(), locations.getRegionLocation(1),
locations.getRegionLocation(1).getRegionInfo(), row, Lists.newArrayList(entry),
new AtomicLong());
RpcRetryingCallerFactory factory = RpcRetryingCallerFactory.instantiate(
connection.getConfiguration());
factory.<ReplicateWALEntryResponse> newCaller().callWithRetries(callable, 10000);
}
}
@Test
public void testReplayCallableWithRegionMove() throws Exception {
// tests replaying the edits to a secondary region replica using the Callable directly while
// the region is moved to another location.It tests handling of RME.
openRegion(HTU, rs0, hriSecondary);
ClusterConnection connection =
(ClusterConnection) HConnectionManager.createConnection(HTU.getConfiguration());
//load some data to primary
HTU.loadNumericRows(table, f, 0, 1000);
Assert.assertEquals(1000, entries.size());
// replay the edits to the secondary using replay callable
replicateUsingCallable(connection, entries);
HRegion region = rs0.getFromOnlineRegions(hriSecondary.getEncodedName());
HTU.verifyNumericRows(region, f, 0, 1000);
HTU.loadNumericRows(table, f, 1000, 2000); // load some more data to primary
// move the secondary region from RS0 to RS1
closeRegion(HTU, rs0, hriSecondary);
openRegion(HTU, rs1, hriSecondary);
// replicate the new data
replicateUsingCallable(connection, entries);
region = rs1.getFromOnlineRegions(hriSecondary.getEncodedName());
// verify the new data. old data may or may not be there
HTU.verifyNumericRows(region, f, 1000, 2000);
HTU.deleteNumericRows(table, f, 0, 2000);
closeRegion(HTU, rs1, hriSecondary);
connection.close();
}
@Test
public void testRegionReplicaReplicationEndpointReplicate() throws Exception {
// tests replaying the edits to a secondary region replica using the RRRE.replicate()
openRegion(HTU, rs0, hriSecondary);
ClusterConnection connection =
(ClusterConnection) HConnectionManager.createConnection(HTU.getConfiguration());
RegionReplicaReplicationEndpoint replicator = new RegionReplicaReplicationEndpoint();
ReplicationEndpoint.Context context = mock(ReplicationEndpoint.Context.class);
when(context.getConfiguration()).thenReturn(HTU.getConfiguration());
replicator.init(context);
replicator.start();
//load some data to primary
HTU.loadNumericRows(table, f, 0, 1000);
Assert.assertEquals(1000, entries.size());
// replay the edits to the secondary using replay callable
replicator.replicate(new ReplicateContext().setEntries(Lists.newArrayList(entries)));
HRegion region = rs0.getFromOnlineRegions(hriSecondary.getEncodedName());
HTU.verifyNumericRows(region, f, 0, 1000);
HTU.deleteNumericRows(table, f, 0, 1000);
closeRegion(HTU, rs0, hriSecondary);
connection.close();
}
}