From 055f5a95e46e55412e05892f86fc0e0e2792356c Mon Sep 17 00:00:00 2001 From: Devaraj Das Date: Fri, 6 Feb 2015 07:48:32 -0800 Subject: [PATCH] HBASE-11568. Async WAL replication for region replicas (Enis Soztutar) --- .../hbase/client/RpcRetryingCaller.java | 2 +- .../src/main/resources/hbase-default.xml | 14 + .../master/handler/CreateTableHandler.java | 13 +- .../protobuf/ReplicationProtbufUtil.java | 20 +- .../hbase/regionserver/RSRpcServices.java | 23 +- .../RegionReplicaReplicationEndpoint.java | 558 ++++++++++++++++++ .../regionserver/ReplicationSource.java | 3 +- .../hbase/util/ServerRegionReplicaUtil.java | 51 ++ .../apache/hadoop/hbase/wal/WALSplitter.java | 206 ++++--- .../hadoop/hbase/HBaseTestingUtility.java | 35 +- .../regionserver/TestRegionReplicas.java | 95 +-- .../TestRegionServerNoMaster.java | 94 +-- .../TestRegionReplicaReplicationEndpoint.java | 345 +++++++++++ ...ionReplicaReplicationEndpointNoMaster.java | 265 +++++++++ .../hadoop/hbase/wal/TestWALMethods.java | 6 +- 15 files changed, 1520 insertions(+), 210 deletions(-) create mode 100644 hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/RegionReplicaReplicationEndpoint.java create mode 100644 hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestRegionReplicaReplicationEndpoint.java create mode 100644 hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestRegionReplicaReplicationEndpointNoMaster.java diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/RpcRetryingCaller.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/RpcRetryingCaller.java index b2020bde4b0..896222c8bdb 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/RpcRetryingCaller.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/RpcRetryingCaller.java @@ -49,7 +49,7 @@ import com.google.protobuf.ServiceException; */ @InterfaceAudience.Private public class RpcRetryingCaller { - static final Log LOG = LogFactory.getLog(RpcRetryingCaller.class); + public static final Log LOG = LogFactory.getLog(RpcRetryingCaller.class); /** * When we started making calls. */ diff --git a/hbase-common/src/main/resources/hbase-default.xml b/hbase-common/src/main/resources/hbase-default.xml index b10a006ead1..c14bb8bd8af 100644 --- a/hbase-common/src/main/resources/hbase-default.xml +++ b/hbase-common/src/main/resources/hbase-default.xml @@ -1287,6 +1287,20 @@ possible configurations would overwhelm and obscure the important. value is also recommended with this setting. + + hbase.region.replica.replication.enabled + false + + Whether asynchronous WAL replication to the secondary region replicas is enabled or not. + If this is enabled, a replication peer named "region_replica_replication" will be created + which will tail the logs and replicate the mutatations to region replicas for tables that + have region replication > 1. If this is enabled once, disabling this replication also + requires disabling the replication peer using shell or ReplicationAdmin java class. + Replication to secondary region replicas works over standard inter-cluster replication. + So replication, if disabled explicitly, also has to be enabled by setting "hbase.replication" + to true for this feature to work. + + hbase.http.filter.initializers org.apache.hadoop.hbase.http.lib.StaticUserWebFilter diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/handler/CreateTableHandler.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/handler/CreateTableHandler.java index a17432cbb91..f9d0d24f86f 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/handler/CreateTableHandler.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/handler/CreateTableHandler.java @@ -55,6 +55,7 @@ import org.apache.hadoop.hbase.security.UserProvider; import org.apache.hadoop.hbase.util.FSTableDescriptors; import org.apache.hadoop.hbase.util.FSUtils; import org.apache.hadoop.hbase.util.ModifyRegionUtils; +import org.apache.hadoop.hbase.util.ServerRegionReplicaUtil; /** * Handler to create a table. @@ -215,9 +216,8 @@ public class CreateTableHandler extends EventHandler { */ protected void completed(final Throwable exception) { releaseTableLock(); - String msg = exception == null ? null : exception.getMessage(); LOG.info("Table, " + this.hTableDescriptor.getTableName() + ", creation " + - msg == null ? "successful" : "failed. " + msg); + (exception == null ? "successful" : "failed. " + exception)); if (exception != null) { removeEnablingTable(this.assignmentManager, this.hTableDescriptor.getTableName()); } @@ -262,11 +262,16 @@ public class CreateTableHandler extends EventHandler { // 5. Add replicas if needed regionInfos = addReplicas(hTableDescriptor, regionInfos); - // 6. Trigger immediate assignment of the regions in round-robin fashion + // 6. Setup replication for region replicas if needed + if (hTableDescriptor.getRegionReplication() > 1) { + ServerRegionReplicaUtil.setupRegionReplicaReplication(conf); + } + + // 7. Trigger immediate assignment of the regions in round-robin fashion ModifyRegionUtils.assignRegions(assignmentManager, regionInfos); } - // 7. Set table enabled flag up in zk. + // 8. Set table enabled flag up in zk. try { assignmentManager.getTableStateManager().setTableState(tableName, ZooKeeperProtos.Table.State.ENABLED); diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/protobuf/ReplicationProtbufUtil.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/protobuf/ReplicationProtbufUtil.java index d68d247945a..61d1a9a7075 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/protobuf/ReplicationProtbufUtil.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/protobuf/ReplicationProtbufUtil.java @@ -59,7 +59,7 @@ public class ReplicationProtbufUtil { public static void replicateWALEntry(final AdminService.BlockingInterface admin, final Entry[] entries) throws IOException { Pair p = - buildReplicateWALEntryRequest(entries); + buildReplicateWALEntryRequest(entries, null); PayloadCarryingRpcController controller = new PayloadCarryingRpcController(p.getSecond()); try { admin.replicateWALEntry(controller, p.getFirst()); @@ -78,6 +78,20 @@ public class ReplicationProtbufUtil { public static Pair buildReplicateWALEntryRequest(final Entry[] entries) { // Accumulate all the Cells seen in here. + return buildReplicateWALEntryRequest(entries, null); + } + + /** + * Create a new ReplicateWALEntryRequest from a list of HLog entries + * + * @param entries the HLog entries to be replicated + * @param encodedRegionName alternative region name to use if not null + * @return a pair of ReplicateWALEntryRequest and a CellScanner over all the WALEdit values + * found. + */ + public static Pair + buildReplicateWALEntryRequest(final Entry[] entries, byte[] encodedRegionName) { + // Accumulate all the KVs seen in here. List> allCells = new ArrayList>(entries.length); int size = 0; WALProtos.FamilyScope.Builder scopeBuilder = WALProtos.FamilyScope.newBuilder(); @@ -91,7 +105,9 @@ public class ReplicationProtbufUtil { WALProtos.WALKey.Builder keyBuilder = entryBuilder.getKeyBuilder(); WALKey key = entry.getKey(); keyBuilder.setEncodedRegionName( - ByteStringer.wrap(key.getEncodedRegionName())); + ByteStringer.wrap(encodedRegionName == null + ? key.getEncodedRegionName() + : encodedRegionName)); keyBuilder.setTableName(ByteStringer.wrap(key.getTablename().getName())); keyBuilder.setLogSequenceNumber(key.getLogSeqNum()); keyBuilder.setWriteTime(key.getWriteTime()); diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RSRpcServices.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RSRpcServices.java index b5a23439ab8..f2e5571d1a6 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RSRpcServices.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RSRpcServices.java @@ -58,6 +58,7 @@ import org.apache.hadoop.hbase.classification.InterfaceAudience; import org.apache.hadoop.hbase.client.Append; import org.apache.hadoop.hbase.client.ConnectionUtils; import org.apache.hadoop.hbase.client.Delete; +import org.apache.hadoop.hbase.client.Durability; import org.apache.hadoop.hbase.client.Get; import org.apache.hadoop.hbase.client.Increment; import org.apache.hadoop.hbase.client.Mutation; @@ -157,6 +158,7 @@ import org.apache.hadoop.hbase.util.Bytes; import org.apache.hadoop.hbase.util.Counter; import org.apache.hadoop.hbase.util.EnvironmentEdgeManager; import org.apache.hadoop.hbase.util.Pair; +import org.apache.hadoop.hbase.util.ServerRegionReplicaUtil; import org.apache.hadoop.hbase.util.Strings; import org.apache.hadoop.hbase.wal.WALKey; import org.apache.hadoop.hbase.wal.WALSplitter; @@ -1452,11 +1454,24 @@ public class RSRpcServices implements HBaseRPCErrorHandler, // empty input return ReplicateWALEntryResponse.newBuilder().build(); } - HRegion region = regionServer.getRegionByEncodedName( - entries.get(0).getKey().getEncodedRegionName().toStringUtf8()); - RegionCoprocessorHost coprocessorHost = region.getCoprocessorHost(); + ByteString regionName = entries.get(0).getKey().getEncodedRegionName(); + HRegion region = regionServer.getRegionByEncodedName(regionName.toStringUtf8()); + RegionCoprocessorHost coprocessorHost = + ServerRegionReplicaUtil.isDefaultReplica(region.getRegionInfo()) + ? region.getCoprocessorHost() + : null; // do not invoke coprocessors if this is a secondary region replica List> walEntries = new ArrayList>(); + + // Skip adding the edits to WAL if this is a secondary region replica + boolean isPrimary = RegionReplicaUtil.isDefaultReplica(region.getRegionInfo()); + Durability durability = isPrimary ? Durability.USE_DEFAULT : Durability.SKIP_WAL; + for (WALEntry entry : entries) { + if (!regionName.equals(entry.getKey().getEncodedRegionName())) { + throw new NotServingRegionException("Replay request contains entries from multiple " + + "regions. First region:" + regionName.toStringUtf8() + " , other region:" + + entry.getKey().getEncodedRegionName()); + } if (regionServer.nonceManager != null) { long nonceGroup = entry.getKey().hasNonceGroup() ? entry.getKey().getNonceGroup() : HConstants.NO_NONCE; @@ -1466,7 +1481,7 @@ public class RSRpcServices implements HBaseRPCErrorHandler, Pair walEntry = (coprocessorHost == null) ? null : new Pair(); List edits = WALSplitter.getMutationsFromWALEntry(entry, - cells, walEntry); + cells, walEntry, durability); if (coprocessorHost != null) { // Start coprocessor replay here. The coprocessor is for each WALEdit instead of a // KeyValue. diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/RegionReplicaReplicationEndpoint.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/RegionReplicaReplicationEndpoint.java new file mode 100644 index 00000000000..3dab12a3dc4 --- /dev/null +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/RegionReplicaReplicationEndpoint.java @@ -0,0 +1,558 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.hbase.replication.regionserver; + +import java.io.IOException; +import java.io.InterruptedIOException; +import java.util.ArrayList; +import java.util.List; +import java.util.Map; +import java.util.concurrent.Callable; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Future; +import java.util.concurrent.LinkedBlockingQueue; +import java.util.concurrent.ThreadPoolExecutor; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicLong; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.hbase.classification.InterfaceAudience; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.hbase.CellScanner; +import org.apache.hadoop.hbase.DoNotRetryIOException; +import org.apache.hadoop.hbase.HBaseConfiguration; +import org.apache.hadoop.hbase.HBaseIOException; +import org.apache.hadoop.hbase.HConstants; +import org.apache.hadoop.hbase.HRegionInfo; +import org.apache.hadoop.hbase.HRegionLocation; +import org.apache.hadoop.hbase.RegionLocations; +import org.apache.hadoop.hbase.TableName; +import org.apache.hadoop.hbase.TableNotFoundException; +import org.apache.hadoop.hbase.client.RegionAdminServiceCallable; +import org.apache.hadoop.hbase.client.ClusterConnection; +import org.apache.hadoop.hbase.client.HConnectionManager; +import org.apache.hadoop.hbase.client.RegionReplicaUtil; +import org.apache.hadoop.hbase.client.RetriesExhaustedException; +import org.apache.hadoop.hbase.client.RetryingCallable; +import org.apache.hadoop.hbase.client.RpcRetryingCallerFactory; +import org.apache.hadoop.hbase.ipc.PayloadCarryingRpcController; +import org.apache.hadoop.hbase.ipc.RpcControllerFactory; +import org.apache.hadoop.hbase.protobuf.ProtobufUtil; +import org.apache.hadoop.hbase.protobuf.ReplicationProtbufUtil; +import org.apache.hadoop.hbase.protobuf.generated.AdminProtos; +import org.apache.hadoop.hbase.protobuf.generated.AdminProtos.ReplicateWALEntryResponse; +import org.apache.hadoop.hbase.wal.WALKey; +import org.apache.hadoop.hbase.wal.WALSplitter.PipelineController; +import org.apache.hadoop.hbase.wal.WALSplitter.RegionEntryBuffer; +import org.apache.hadoop.hbase.wal.WALSplitter.SinkWriter; +import org.apache.hadoop.hbase.regionserver.wal.WALCellCodec; +import org.apache.hadoop.hbase.wal.WAL.Entry; +import org.apache.hadoop.hbase.wal.WALSplitter.EntryBuffers; +import org.apache.hadoop.hbase.wal.WALSplitter.OutputSink; +import org.apache.hadoop.hbase.replication.HBaseReplicationEndpoint; +import org.apache.hadoop.hbase.replication.ReplicationEndpoint; +import org.apache.hadoop.hbase.replication.WALEntryFilter; +import org.apache.hadoop.hbase.util.Bytes; +import org.apache.hadoop.hbase.util.Pair; +import org.apache.hadoop.hbase.util.Threads; +import org.apache.hadoop.util.StringUtils; + +import com.google.common.cache.Cache; +import com.google.common.cache.CacheBuilder; +import com.google.protobuf.ServiceException; + +/** + * A {@link ReplicationEndpoint} endpoint which receives the WAL edits from the + * WAL, and sends the edits to replicas of regions. + */ +@InterfaceAudience.Private +public class RegionReplicaReplicationEndpoint extends HBaseReplicationEndpoint { + + private static final Log LOG = LogFactory.getLog(RegionReplicaReplicationEndpoint.class); + + private Configuration conf; + private ClusterConnection connection; + + // Reuse WALSplitter constructs as a WAL pipe + private PipelineController controller; + private RegionReplicaOutputSink outputSink; + private EntryBuffers entryBuffers; + + // Number of writer threads + private int numWriterThreads; + + private int operationTimeout; + + private ExecutorService pool; + + @Override + public void init(Context context) throws IOException { + super.init(context); + + this.conf = HBaseConfiguration.create(context.getConfiguration()); + + String codecClassName = conf + .get(WALCellCodec.WAL_CELL_CODEC_CLASS_KEY, WALCellCodec.class.getName()); + conf.set(HConstants.RPC_CODEC_CONF_KEY, codecClassName); + + this.numWriterThreads = this.conf.getInt( + "hbase.region.replica.replication.writer.threads", 3); + controller = new PipelineController(); + entryBuffers = new EntryBuffers(controller, + this.conf.getInt("hbase.region.replica.replication.buffersize", + 128*1024*1024)); + + // use the regular RPC timeout for replica replication RPC's + this.operationTimeout = conf.getInt(HConstants.HBASE_CLIENT_OPERATION_TIMEOUT, + HConstants.DEFAULT_HBASE_CLIENT_OPERATION_TIMEOUT); + } + + @Override + protected void doStart() { + try { + connection = (ClusterConnection) HConnectionManager.createConnection(ctx.getConfiguration()); + this.pool = getDefaultThreadPool(conf); + outputSink = new RegionReplicaOutputSink(controller, entryBuffers, connection, pool, + numWriterThreads, operationTimeout); + outputSink.startWriterThreads(); + super.doStart(); + } catch (IOException ex) { + LOG.warn("Received exception while creating connection :" + ex); + notifyFailed(ex); + } + } + + @Override + protected void doStop() { + if (outputSink != null) { + try { + outputSink.finishWritingAndClose(); + } catch (IOException ex) { + LOG.warn("Got exception while trying to close OutputSink"); + LOG.warn(ex); + } + } + if (this.pool != null) { + this.pool.shutdownNow(); + try { + // wait for 10 sec + boolean shutdown = this.pool.awaitTermination(10000, TimeUnit.MILLISECONDS); + if (!shutdown) { + LOG.warn("Failed to shutdown the thread pool after 10 seconds"); + } + } catch (InterruptedException e) { + LOG.warn("Got interrupted while waiting for the thread pool to shut down" + e); + } + } + if (connection != null) { + try { + connection.close(); + } catch (IOException ex) { + LOG.warn("Got exception closing connection :" + ex); + } + } + super.doStop(); + } + + /** + * Returns a Thread pool for the RPC's to region replicas. Similar to + * Connection's thread pool. + */ + private ExecutorService getDefaultThreadPool(Configuration conf) { + int maxThreads = conf.getInt("hbase.region.replica.replication.threads.max", 256); + int coreThreads = conf.getInt("hbase.region.replica.replication.threads.core", 16); + if (maxThreads == 0) { + maxThreads = Runtime.getRuntime().availableProcessors() * 8; + } + if (coreThreads == 0) { + coreThreads = Runtime.getRuntime().availableProcessors() * 8; + } + long keepAliveTime = conf.getLong("hbase.region.replica.replication.threads.keepalivetime", 60); + LinkedBlockingQueue workQueue = + new LinkedBlockingQueue(maxThreads * + conf.getInt(HConstants.HBASE_CLIENT_MAX_TOTAL_TASKS, + HConstants.DEFAULT_HBASE_CLIENT_MAX_TOTAL_TASKS)); + ThreadPoolExecutor tpe = new ThreadPoolExecutor( + coreThreads, + maxThreads, + keepAliveTime, + TimeUnit.SECONDS, + workQueue, + Threads.newDaemonThreadFactory(this.getClass().toString() + "-rpc-shared-")); + tpe.allowCoreThreadTimeOut(true); + return tpe; + } + + @Override + public boolean replicate(ReplicateContext replicateContext) { + /* A note on batching in RegionReplicaReplicationEndpoint (RRRE): + * + * RRRE relies on batching from two different mechanisms. The first is the batching from + * ReplicationSource since RRRE is a ReplicationEndpoint driven by RS. RS reads from a single + * WAL file filling up a buffer of heap size "replication.source.size.capacity"(64MB) or at most + * "replication.source.nb.capacity" entries or until it sees the end of file (in live tailing). + * Then RS passes all the buffered edits in this replicate() call context. RRRE puts the edits + * to the WALSplitter.EntryBuffers which is a blocking buffer space of up to + * "hbase.region.replica.replication.buffersize" (128MB) in size. This buffer splits the edits + * based on regions. + * + * There are "hbase.region.replica.replication.writer.threads"(default 3) writer threads which + * pick largest per-region buffer and send it to the SinkWriter (see RegionReplicaOutputSink). + * The SinkWriter in this case will send the wal edits to all secondary region replicas in + * parallel via a retrying rpc call. EntryBuffers guarantees that while a buffer is + * being written to the sink, another buffer for the same region will not be made available to + * writers ensuring regions edits are not replayed out of order. + * + * The replicate() call won't return until all the buffers are sent and ack'd by the sinks so + * that the replication can assume all edits are persisted. We may be able to do a better + * pipelining between the replication thread and output sinks later if it becomes a bottleneck. + */ + + while (this.isRunning()) { + try { + for (Entry entry: replicateContext.getEntries()) { + entryBuffers.appendEntry(entry); + } + outputSink.flush(); // make sure everything is flushed + return true; + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + return false; + } catch (IOException e) { + LOG.warn("Received IOException while trying to replicate" + + StringUtils.stringifyException(e)); + } + } + + return false; + } + + @Override + public boolean canReplicateToSameCluster() { + return true; + } + + @Override + protected WALEntryFilter getScopeWALEntryFilter() { + // we do not care about scope. We replicate everything. + return null; + } + + static class RegionReplicaOutputSink extends OutputSink { + private RegionReplicaSinkWriter sinkWriter; + + public RegionReplicaOutputSink(PipelineController controller, EntryBuffers entryBuffers, + ClusterConnection connection, ExecutorService pool, int numWriters, int operationTimeout) { + super(controller, entryBuffers, numWriters); + this.sinkWriter = new RegionReplicaSinkWriter(this, connection, pool, operationTimeout); + } + + @Override + public void append(RegionEntryBuffer buffer) throws IOException { + List entries = buffer.getEntryBuffer(); + + if (entries.isEmpty() || entries.get(0).getEdit().getCells().isEmpty()) { + return; + } + + sinkWriter.append(buffer.getTableName(), buffer.getEncodedRegionName(), + entries.get(0).getEdit().getCells().get(0).getRow(), entries); + } + + @Override + public boolean flush() throws IOException { + // nothing much to do for now. Wait for the Writer threads to finish up + // append()'ing the data. + entryBuffers.waitUntilDrained(); + return super.flush(); + } + + @Override + public List finishWritingAndClose() throws IOException { + finishWriting(); + return null; + } + + @Override + public Map getOutputCounts() { + return null; // only used in tests + } + + @Override + public int getNumberOfRecoveredRegions() { + return 0; + } + + AtomicLong getSkippedEditsCounter() { + return skippedEdits; + } + } + + static class RegionReplicaSinkWriter extends SinkWriter { + RegionReplicaOutputSink sink; + ClusterConnection connection; + RpcControllerFactory rpcControllerFactory; + RpcRetryingCallerFactory rpcRetryingCallerFactory; + int operationTimeout; + ExecutorService pool; + Cache disabledAndDroppedTables; + + public RegionReplicaSinkWriter(RegionReplicaOutputSink sink, ClusterConnection connection, + ExecutorService pool, int operationTimeout) { + this.sink = sink; + this.connection = connection; + this.operationTimeout = operationTimeout; + this.rpcRetryingCallerFactory + = RpcRetryingCallerFactory.instantiate(connection.getConfiguration()); + this.rpcControllerFactory = RpcControllerFactory.instantiate(connection.getConfiguration()); + this.pool = pool; + + int nonExistentTableCacheExpiryMs = connection.getConfiguration() + .getInt("hbase.region.replica.replication.cache.disabledAndDroppedTables.expiryMs", 5000); + // A cache for non existing tables that have a default expiry of 5 sec. This means that if the + // table is created again with the same name, we might miss to replicate for that amount of + // time. But this cache prevents overloading meta requests for every edit from a deleted file. + disabledAndDroppedTables = CacheBuilder.newBuilder() + .expireAfterWrite(nonExistentTableCacheExpiryMs, TimeUnit.MILLISECONDS) + .initialCapacity(10) + .maximumSize(1000) + .build(); + } + + public void append(TableName tableName, byte[] encodedRegionName, byte[] row, + List entries) throws IOException { + + if (disabledAndDroppedTables.getIfPresent(tableName) != null) { + sink.getSkippedEditsCounter().incrementAndGet(); + return; + } + + // get the replicas of the primary region + RegionLocations locations = null; + try { + locations = getRegionLocations(connection, tableName, row, true, 0); + + if (locations == null) { + throw new HBaseIOException("Cannot locate locations for " + + tableName + ", row:" + Bytes.toStringBinary(row)); + } + } catch (TableNotFoundException e) { + disabledAndDroppedTables.put(tableName, Boolean.TRUE); // put to cache. Value ignored + // skip this entry + sink.getSkippedEditsCounter().addAndGet(entries.size()); + return; + } + + if (locations.size() == 1) { + return; + } + + ArrayList> tasks + = new ArrayList>(2); + + // check whether we should still replay this entry. If the regions are changed, or the + // entry is not coming form the primary region, filter it out. + HRegionLocation primaryLocation = locations.getDefaultRegionLocation(); + if (!Bytes.equals(primaryLocation.getRegionInfo().getEncodedNameAsBytes(), + encodedRegionName)) { + sink.getSkippedEditsCounter().addAndGet(entries.size()); + return; + } + + + // All passed entries should belong to one region because it is coming from the EntryBuffers + // split per region. But the regions might split and merge (unlike log recovery case). + for (int replicaId = 0; replicaId < locations.size(); replicaId++) { + HRegionLocation location = locations.getRegionLocation(replicaId); + if (!RegionReplicaUtil.isDefaultReplica(replicaId)) { + HRegionInfo regionInfo = location == null + ? RegionReplicaUtil.getRegionInfoForReplica( + locations.getDefaultRegionLocation().getRegionInfo(), replicaId) + : location.getRegionInfo(); + RegionReplicaReplayCallable callable = new RegionReplicaReplayCallable(connection, + rpcControllerFactory, tableName, location, regionInfo, row, entries, + sink.getSkippedEditsCounter()); + Future task = pool.submit( + new RetryingRpcCallable(rpcRetryingCallerFactory, + callable, operationTimeout)); + tasks.add(task); + } + } + + boolean tasksCancelled = false; + for (Future task : tasks) { + try { + task.get(); + } catch (InterruptedException e) { + throw new InterruptedIOException(e.getMessage()); + } catch (ExecutionException e) { + Throwable cause = e.getCause(); + if (cause instanceof IOException) { + // The table can be disabled or dropped at this time. For disabled tables, we have no + // cheap mechanism to detect this case because meta does not contain this information. + // HConnection.isTableDisabled() is a zk call which we cannot do for every replay RPC. + // So instead we start the replay RPC with retries and + // check whether the table is dropped or disabled which might cause + // SocketTimeoutException, or RetriesExhaustedException or similar if we get IOE. + if (cause instanceof TableNotFoundException || connection.isTableDisabled(tableName)) { + disabledAndDroppedTables.put(tableName, Boolean.TRUE); // put to cache for later. + if (!tasksCancelled) { + sink.getSkippedEditsCounter().addAndGet(entries.size()); + tasksCancelled = true; // so that we do not add to skipped counter again + } + continue; + } + // otherwise rethrow + throw (IOException)cause; + } + // unexpected exception + throw new IOException(cause); + } + } + } + } + + static class RetryingRpcCallable implements Callable { + RpcRetryingCallerFactory factory; + RetryingCallable callable; + int timeout; + public RetryingRpcCallable(RpcRetryingCallerFactory factory, RetryingCallable callable, + int timeout) { + this.factory = factory; + this.callable = callable; + this.timeout = timeout; + } + @Override + public V call() throws Exception { + return factory.newCaller().callWithRetries(callable, timeout); + } + } + + /** + * Calls replay on the passed edits for the given set of entries belonging to the region. It skips + * the entry if the region boundaries have changed or the region is gone. + */ + static class RegionReplicaReplayCallable + extends RegionAdminServiceCallable { + // replicaId of the region replica that we want to replicate to + private final int replicaId; + + private final List entries; + private final byte[] initialEncodedRegionName; + private final AtomicLong skippedEntries; + private final RpcControllerFactory rpcControllerFactory; + private boolean skip; + + public RegionReplicaReplayCallable(ClusterConnection connection, + RpcControllerFactory rpcControllerFactory, TableName tableName, + HRegionLocation location, HRegionInfo regionInfo, byte[] row,List entries, + AtomicLong skippedEntries) { + super(connection, location, tableName, row); + this.replicaId = regionInfo.getReplicaId(); + this.entries = entries; + this.rpcControllerFactory = rpcControllerFactory; + this.skippedEntries = skippedEntries; + this.initialEncodedRegionName = regionInfo.getEncodedNameAsBytes(); + } + + @Override + public HRegionLocation getLocation(boolean useCache) throws IOException { + RegionLocations rl = getRegionLocations(connection, tableName, row, useCache, replicaId); + if (rl == null) { + throw new HBaseIOException(getExceptionMessage()); + } + location = rl.getRegionLocation(replicaId); + if (location == null) { + throw new HBaseIOException(getExceptionMessage()); + } + + // check whether we should still replay this entry. If the regions are changed, or the + // entry is not coming form the primary region, filter it out because we do not need it. + // Regions can change because of (1) region split (2) region merge (3) table recreated + if (!Bytes.equals(location.getRegionInfo().getEncodedNameAsBytes(), + initialEncodedRegionName)) { + skip = true; + return null; + } + + return location; + } + + @Override + public ReplicateWALEntryResponse call(int timeout) throws IOException { + return replayToServer(this.entries, timeout); + } + + private ReplicateWALEntryResponse replayToServer(List entries, int timeout) + throws IOException { + if (entries.isEmpty() || skip) { + skippedEntries.incrementAndGet(); + return ReplicateWALEntryResponse.newBuilder().build(); + } + + Entry[] entriesArray = new Entry[entries.size()]; + entriesArray = entries.toArray(entriesArray); + + // set the region name for the target region replica + Pair p = + ReplicationProtbufUtil.buildReplicateWALEntryRequest( + entriesArray, location.getRegionInfo().getEncodedNameAsBytes()); + try { + PayloadCarryingRpcController controller = rpcControllerFactory.newController(p.getSecond()); + controller.setCallTimeout(timeout); + controller.setPriority(tableName); + return stub.replay(controller, p.getFirst()); + } catch (ServiceException se) { + throw ProtobufUtil.getRemoteException(se); + } + } + + @Override + protected String getExceptionMessage() { + return super.getExceptionMessage() + " table=" + tableName + + " ,replica=" + replicaId + ", row=" + Bytes.toStringBinary(row); + } + } + + private static RegionLocations getRegionLocations( + ClusterConnection connection, TableName tableName, byte[] row, + boolean useCache, int replicaId) + throws RetriesExhaustedException, DoNotRetryIOException, InterruptedIOException { + RegionLocations rl; + try { + rl = connection.locateRegion(tableName, row, useCache, true, replicaId); + } catch (DoNotRetryIOException e) { + throw e; + } catch (RetriesExhaustedException e) { + throw e; + } catch (InterruptedIOException e) { + throw e; + } catch (IOException e) { + throw new RetriesExhaustedException("Can't get the location", e); + } + if (rl == null) { + throw new RetriesExhaustedException("Can't get the locations"); + } + + return rl; + } +} diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSource.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSource.java index 6e2ef2d3919..ee43956f482 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSource.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSource.java @@ -717,7 +717,8 @@ public class ReplicationSource extends Thread } break; } catch (Exception ex) { - LOG.warn(replicationEndpoint.getClass().getName() + " threw unknown exception:" + ex); + LOG.warn(replicationEndpoint.getClass().getName() + " threw unknown exception:" + + org.apache.hadoop.util.StringUtils.stringifyException(ex)); if (sleepForRetries("ReplicationEndpoint threw exception", sleepMultiplier)) { sleepMultiplier++; } diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/util/ServerRegionReplicaUtil.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/util/ServerRegionReplicaUtil.java index 7dafa6876fc..cf8721967f7 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/util/ServerRegionReplicaUtil.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/util/ServerRegionReplicaUtil.java @@ -25,16 +25,36 @@ import org.apache.hadoop.fs.FileStatus; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.hbase.HRegionInfo; import org.apache.hadoop.hbase.client.RegionReplicaUtil; +import org.apache.hadoop.hbase.client.replication.ReplicationAdmin; import org.apache.hadoop.hbase.io.HFileLink; import org.apache.hadoop.hbase.io.Reference; import org.apache.hadoop.hbase.regionserver.HRegion; import org.apache.hadoop.hbase.regionserver.StoreFileInfo; +import org.apache.hadoop.hbase.replication.ReplicationException; +import org.apache.hadoop.hbase.replication.ReplicationPeerConfig; +import org.apache.hadoop.hbase.replication.regionserver.RegionReplicaReplicationEndpoint; +import org.apache.hadoop.hbase.zookeeper.ZKUtil; /** * Similar to {@link RegionReplicaUtil} but for the server side */ public class ServerRegionReplicaUtil extends RegionReplicaUtil { + /** + * Whether asynchronous WAL replication to the secondary region replicas is enabled or not. + * If this is enabled, a replication peer named "region_replica_replication" will be created + * which will tail the logs and replicate the mutatations to region replicas for tables that + * have region replication > 1. If this is enabled once, disabling this replication also + * requires disabling the replication peer using shell or ReplicationAdmin java class. + * Replication to secondary region replicas works over standard inter-cluster replication.· + * So replication, if disabled explicitly, also has to be enabled by setting "hbase.replication"· + * to true for this feature to work. + */ + public static final String REGION_REPLICA_REPLICATION_CONF_KEY + = "hbase.region.replica.replication.enabled"; + private static final boolean DEFAULT_REGION_REPLICA_REPLICATION = false; + private static final String REGION_REPLICA_REPLICATION_PEER = "region_replica_replication"; + /** * Returns the regionInfo object to use for interacting with the file system. * @return An HRegionInfo object to interact with the filesystem @@ -95,4 +115,35 @@ public class ServerRegionReplicaUtil extends RegionReplicaUtil { return new StoreFileInfo(conf, fs, status, link); } + /** + * Create replication peer for replicating to region replicas if needed. + * @param conf configuration to use + * @throws IOException + */ + public static void setupRegionReplicaReplication(Configuration conf) throws IOException { + if (!conf.getBoolean(REGION_REPLICA_REPLICATION_CONF_KEY, DEFAULT_REGION_REPLICA_REPLICATION)) { + return; + } + ReplicationAdmin repAdmin = new ReplicationAdmin(conf); + try { + if (repAdmin.getPeerConfig(REGION_REPLICA_REPLICATION_PEER) == null) { + ReplicationPeerConfig peerConfig = new ReplicationPeerConfig(); + peerConfig.setClusterKey(ZKUtil.getZooKeeperClusterKey(conf)); + peerConfig.setReplicationEndpointImpl(RegionReplicaReplicationEndpoint.class.getName()); + repAdmin.addPeer(REGION_REPLICA_REPLICATION_PEER, peerConfig, null); + } + } catch (ReplicationException ex) { + throw new IOException(ex); + } finally { + repAdmin.close(); + } + } + + /** + * Return the peer id used for replicating to secondary region replicas + */ + public static String getReplicationPeerId() { + return REGION_REPLICA_REPLICATION_PEER; + } + } diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/WALSplitter.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/WALSplitter.java index 1744adfc8de..a436f292805 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/WALSplitter.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/WALSplitter.java @@ -151,6 +151,7 @@ public class WALSplitter { // Major subcomponents of the split process. // These are separated into inner classes to make testing easier. + PipelineController controller; OutputSink outputSink; EntryBuffers entryBuffers; @@ -159,14 +160,6 @@ public class WALSplitter { private BaseCoordinatedStateManager csm; private final WALFactory walFactory; - // If an exception is thrown by one of the other threads, it will be - // stored here. - protected AtomicReference thrown = new AtomicReference(); - - // Wait/notify for when data has been produced by the reader thread, - // consumed by the reader thread, or an exception occurred - final Object dataAvailable = new Object(); - private MonitoredTask status; // For checking the latest flushed sequence id @@ -202,8 +195,9 @@ public class WALSplitter { this.sequenceIdChecker = idChecker; this.csm = (BaseCoordinatedStateManager)csm; this.walFactory = factory; + this.controller = new PipelineController(); - entryBuffers = new EntryBuffers( + entryBuffers = new EntryBuffers(controller, this.conf.getInt("hbase.regionserver.hlog.splitlog.buffersize", 128*1024*1024)); @@ -214,13 +208,13 @@ public class WALSplitter { this.numWriterThreads = this.conf.getInt("hbase.regionserver.hlog.splitlog.writer.threads", 3); if (csm != null && this.distributedLogReplay) { - outputSink = new LogReplayOutputSink(numWriterThreads); + outputSink = new LogReplayOutputSink(controller, entryBuffers, numWriterThreads); } else { if (this.distributedLogReplay) { LOG.info("ZooKeeperWatcher is passed in as NULL so disable distrubitedLogRepaly."); } this.distributedLogReplay = false; - outputSink = new LogRecoveredEditsOutputSink(numWriterThreads); + outputSink = new LogRecoveredEditsOutputSink(controller, entryBuffers, numWriterThreads); } } @@ -828,22 +822,6 @@ public class WALSplitter { } } - private void writerThreadError(Throwable t) { - thrown.compareAndSet(null, t); - } - - /** - * Check for errors in the writer threads. If any is found, rethrow it. - */ - private void checkForErrors() throws IOException { - Throwable thrown = this.thrown.get(); - if (thrown == null) return; - if (thrown instanceof IOException) { - throw new IOException(thrown); - } else { - throw new RuntimeException(thrown); - } - } /** * Create a new {@link Writer} for writing log splits. * @return a new Writer instance, caller should close @@ -872,6 +850,36 @@ public class WALSplitter { return result; } + /** + * Contains some methods to control WAL-entries producer / consumer interactions + */ + public static class PipelineController { + // If an exception is thrown by one of the other threads, it will be + // stored here. + AtomicReference thrown = new AtomicReference(); + + // Wait/notify for when data has been produced by the writer thread, + // consumed by the reader thread, or an exception occurred + public final Object dataAvailable = new Object(); + + void writerThreadError(Throwable t) { + thrown.compareAndSet(null, t); + } + + /** + * Check for errors in the writer threads. If any is found, rethrow it. + */ + void checkForErrors() throws IOException { + Throwable thrown = this.thrown.get(); + if (thrown == null) return; + if (thrown instanceof IOException) { + throw new IOException(thrown); + } else { + throw new RuntimeException(thrown); + } + } + } + /** * Class which accumulates edits and separates them into a buffer per region * while simultaneously accounting RAM usage. Blocks if the RAM usage crosses @@ -879,7 +887,9 @@ public class WALSplitter { * * Writer threads then pull region-specific buffers from this class. */ - class EntryBuffers { + public static class EntryBuffers { + PipelineController controller; + Map buffers = new TreeMap(Bytes.BYTES_COMPARATOR); @@ -891,7 +901,8 @@ public class WALSplitter { long totalBuffered = 0; long maxHeapUsage; - EntryBuffers(long maxHeapUsage) { + public EntryBuffers(PipelineController controller, long maxHeapUsage) { + this.controller = controller; this.maxHeapUsage = maxHeapUsage; } @@ -902,7 +913,7 @@ public class WALSplitter { * @throws InterruptedException * @throws IOException */ - void appendEntry(Entry entry) throws InterruptedException, IOException { + public void appendEntry(Entry entry) throws InterruptedException, IOException { WALKey key = entry.getKey(); RegionEntryBuffer buffer; @@ -917,15 +928,15 @@ public class WALSplitter { } // If we crossed the chunk threshold, wait for more space to be available - synchronized (dataAvailable) { + synchronized (controller.dataAvailable) { totalBuffered += incrHeap; - while (totalBuffered > maxHeapUsage && thrown.get() == null) { + while (totalBuffered > maxHeapUsage && controller.thrown.get() == null) { LOG.debug("Used " + totalBuffered + " bytes of buffered edits, waiting for IO threads..."); - dataAvailable.wait(2000); + controller.dataAvailable.wait(2000); } - dataAvailable.notifyAll(); + controller.dataAvailable.notifyAll(); } - checkForErrors(); + controller.checkForErrors(); } /** @@ -958,16 +969,30 @@ public class WALSplitter { } long size = buffer.heapSize(); - synchronized (dataAvailable) { + synchronized (controller.dataAvailable) { totalBuffered -= size; // We may unblock writers - dataAvailable.notifyAll(); + controller.dataAvailable.notifyAll(); } } synchronized boolean isRegionCurrentlyWriting(byte[] region) { return currentlyWriting.contains(region); } + + public void waitUntilDrained() { + synchronized (controller.dataAvailable) { + while (totalBuffered > 0) { + try { + controller.dataAvailable.wait(2000); + } catch (InterruptedException e) { + LOG.warn("Got intrerrupted while waiting for EntryBuffers is drained"); + Thread.interrupted(); + break; + } + } + } + } } /** @@ -976,7 +1001,7 @@ public class WALSplitter { * share a single byte array instance for the table and region name. * Also tracks memory usage of the accumulated edits. */ - static class RegionEntryBuffer implements HeapSize { + public static class RegionEntryBuffer implements HeapSize { long heapInBuffer = 0; List entryBuffer; TableName tableName; @@ -1008,14 +1033,30 @@ public class WALSplitter { public long heapSize() { return heapInBuffer; } + + public byte[] getEncodedRegionName() { + return encodedRegionName; + } + + public List getEntryBuffer() { + return entryBuffer; + } + + public TableName getTableName() { + return tableName; + } } - class WriterThread extends Thread { + public static class WriterThread extends Thread { private volatile boolean shouldStop = false; + private PipelineController controller; + private EntryBuffers entryBuffers; private OutputSink outputSink = null; - WriterThread(OutputSink sink, int i) { + WriterThread(PipelineController controller, EntryBuffers entryBuffers, OutputSink sink, int i){ super(Thread.currentThread().getName() + "-Writer-" + i); + this.controller = controller; + this.entryBuffers = entryBuffers; outputSink = sink; } @@ -1025,7 +1066,7 @@ public class WALSplitter { doRun(); } catch (Throwable t) { LOG.error("Exiting thread", t); - writerThreadError(t); + controller.writerThreadError(t); } } @@ -1035,12 +1076,12 @@ public class WALSplitter { RegionEntryBuffer buffer = entryBuffers.getChunkToWrite(); if (buffer == null) { // No data currently available, wait on some more to show up - synchronized (dataAvailable) { + synchronized (controller.dataAvailable) { if (shouldStop && !this.outputSink.flush()) { return; } try { - dataAvailable.wait(500); + controller.dataAvailable.wait(500); } catch (InterruptedException ie) { if (!shouldStop) { throw new RuntimeException(ie); @@ -1064,9 +1105,9 @@ public class WALSplitter { } void finish() { - synchronized (dataAvailable) { + synchronized (controller.dataAvailable) { shouldStop = true; - dataAvailable.notifyAll(); + controller.dataAvailable.notifyAll(); } } } @@ -1075,7 +1116,10 @@ public class WALSplitter { * The following class is an abstraction class to provide a common interface to support both * existing recovered edits file sink and region server WAL edits replay sink */ - abstract class OutputSink { + public static abstract class OutputSink { + + protected PipelineController controller; + protected EntryBuffers entryBuffers; protected Map writers = Collections .synchronizedMap(new TreeMap(Bytes.BYTES_COMPARATOR));; @@ -1101,8 +1145,10 @@ public class WALSplitter { protected List splits = null; - public OutputSink(int numWriters) { + public OutputSink(PipelineController controller, EntryBuffers entryBuffers, int numWriters) { numThreads = numWriters; + this.controller = controller; + this.entryBuffers = entryBuffers; } void setReporter(CancelableProgressable reporter) { @@ -1112,9 +1158,9 @@ public class WALSplitter { /** * Start the threads that will pump data from the entryBuffers to the output files. */ - synchronized void startWriterThreads() { + public synchronized void startWriterThreads() { for (int i = 0; i < numThreads; i++) { - WriterThread t = new WriterThread(this, i); + WriterThread t = new WriterThread(controller, entryBuffers, this, i); t.start(); writerThreads.add(t); } @@ -1173,34 +1219,34 @@ public class WALSplitter { throw iie; } } - checkForErrors(); + controller.checkForErrors(); LOG.info("Split writers finished"); return (!progress_failed); } - abstract List finishWritingAndClose() throws IOException; + public abstract List finishWritingAndClose() throws IOException; /** * @return a map from encoded region ID to the number of edits written out for that region. */ - abstract Map getOutputCounts(); + public abstract Map getOutputCounts(); /** * @return number of regions we've recovered */ - abstract int getNumberOfRecoveredRegions(); + public abstract int getNumberOfRecoveredRegions(); /** * @param buffer A WAL Edit Entry * @throws IOException */ - abstract void append(RegionEntryBuffer buffer) throws IOException; + public abstract void append(RegionEntryBuffer buffer) throws IOException; /** * WriterThread call this function to help flush internal remaining edits in buffer before close * @return true when underlying sink has something to flush */ - protected boolean flush() throws IOException { + public boolean flush() throws IOException { return false; } } @@ -1210,13 +1256,14 @@ public class WALSplitter { */ class LogRecoveredEditsOutputSink extends OutputSink { - public LogRecoveredEditsOutputSink(int numWriters) { + public LogRecoveredEditsOutputSink(PipelineController controller, EntryBuffers entryBuffers, + int numWriters) { // More threads could potentially write faster at the expense // of causing more disk seeks as the logs are split. // 3. After a certain setting (probably around 3) the // process will be bound on the reader in the current // implementation anyway. - super(numWriters); + super(controller, entryBuffers, numWriters); } /** @@ -1224,7 +1271,7 @@ public class WALSplitter { * @throws IOException */ @Override - List finishWritingAndClose() throws IOException { + public List finishWritingAndClose() throws IOException { boolean isSuccessful = false; List result = null; try { @@ -1442,7 +1489,7 @@ public class WALSplitter { } @Override - void append(RegionEntryBuffer buffer) throws IOException { + public void append(RegionEntryBuffer buffer) throws IOException { List entries = buffer.entryBuffer; if (entries.isEmpty()) { LOG.warn("got an empty buffer, skipping"); @@ -1483,7 +1530,7 @@ public class WALSplitter { * @return a map from encoded region ID to the number of edits written out for that region. */ @Override - Map getOutputCounts() { + public Map getOutputCounts() { TreeMap ret = new TreeMap(Bytes.BYTES_COMPARATOR); synchronized (writers) { for (Map.Entry entry : writers.entrySet()) { @@ -1494,7 +1541,7 @@ public class WALSplitter { } @Override - int getNumberOfRecoveredRegions() { + public int getNumberOfRecoveredRegions() { return writers.size(); } } @@ -1502,7 +1549,7 @@ public class WALSplitter { /** * Class wraps the actual writer which writes data out and related statistics */ - private abstract static class SinkWriter { + public abstract static class SinkWriter { /* Count of edits written to this path */ long editsWritten = 0; /* Number of nanos spent writing to this log */ @@ -1563,17 +1610,18 @@ public class WALSplitter { private LogRecoveredEditsOutputSink logRecoveredEditsOutputSink; private boolean hasEditsInDisablingOrDisabledTables = false; - public LogReplayOutputSink(int numWriters) { - super(numWriters); - this.waitRegionOnlineTimeOut = - conf.getInt(HConstants.HBASE_SPLITLOG_MANAGER_TIMEOUT, - ZKSplitLogManagerCoordination.DEFAULT_TIMEOUT); - this.logRecoveredEditsOutputSink = new LogRecoveredEditsOutputSink(numWriters); + public LogReplayOutputSink(PipelineController controller, EntryBuffers entryBuffers, + int numWriters) { + super(controller, entryBuffers, numWriters); + this.waitRegionOnlineTimeOut = conf.getInt(HConstants.HBASE_SPLITLOG_MANAGER_TIMEOUT, + ZKSplitLogManagerCoordination.DEFAULT_TIMEOUT); + this.logRecoveredEditsOutputSink = new LogRecoveredEditsOutputSink(controller, + entryBuffers, numWriters); this.logRecoveredEditsOutputSink.setReporter(reporter); } @Override - void append(RegionEntryBuffer buffer) throws IOException { + public void append(RegionEntryBuffer buffer) throws IOException { List entries = buffer.entryBuffer; if (entries.isEmpty()) { LOG.warn("got an empty buffer, skipping"); @@ -1889,7 +1937,7 @@ public class WALSplitter { } @Override - protected boolean flush() throws IOException { + public boolean flush() throws IOException { String curLoc = null; int curSize = 0; List> curQueue = null; @@ -1910,8 +1958,8 @@ public class WALSplitter { if (curSize > 0) { this.processWorkItems(curLoc, curQueue); // We should already have control of the monitor; ensure this is the case. - synchronized(dataAvailable) { - dataAvailable.notifyAll(); + synchronized(controller.dataAvailable) { + controller.dataAvailable.notifyAll(); } return true; } @@ -1923,7 +1971,7 @@ public class WALSplitter { } @Override - List finishWritingAndClose() throws IOException { + public List finishWritingAndClose() throws IOException { try { if (!finishWriting()) { return null; @@ -1998,7 +2046,7 @@ public class WALSplitter { } @Override - Map getOutputCounts() { + public Map getOutputCounts() { TreeMap ret = new TreeMap(Bytes.BYTES_COMPARATOR); synchronized (writers) { for (Map.Entry entry : writers.entrySet()) { @@ -2009,7 +2057,7 @@ public class WALSplitter { } @Override - int getNumberOfRecoveredRegions() { + public int getNumberOfRecoveredRegions() { return this.recoveredRegions.size(); } @@ -2115,12 +2163,13 @@ public class WALSplitter { * @param cells * @param logEntry pair of WALKey and WALEdit instance stores WALKey and WALEdit instances * extracted from the passed in WALEntry. + * @param durability * @return list of Pair to be replayed * @throws IOException */ public static List getMutationsFromWALEntry(WALEntry entry, CellScanner cells, - Pair logEntry) throws IOException { - + Pair logEntry, Durability durability) + throws IOException { if (entry == null) { // return an empty array return new ArrayList(); @@ -2168,6 +2217,9 @@ public class WALSplitter { } else { ((Put) m).add(cell); } + if (m != null) { + m.setDurability(durability); + } previousCell = cell; } diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/HBaseTestingUtility.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/HBaseTestingUtility.java index b6e7df99114..9ed121ddfb7 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/HBaseTestingUtility.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/HBaseTestingUtility.java @@ -17,6 +17,7 @@ */ package org.apache.hadoop.hbase; +import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertTrue; import static org.junit.Assert.fail; @@ -63,6 +64,7 @@ import org.apache.hadoop.hbase.client.Get; import org.apache.hadoop.hbase.client.HBaseAdmin; import org.apache.hadoop.hbase.client.HConnection; import org.apache.hadoop.hbase.client.HTable; +import org.apache.hadoop.hbase.client.HTableInterface; import org.apache.hadoop.hbase.client.Put; import org.apache.hadoop.hbase.client.RegionLocator; import org.apache.hadoop.hbase.client.Result; @@ -1773,6 +1775,18 @@ public class HBaseTestingUtility extends HBaseCommonTestingUtility { getHBaseAdmin().deleteTable(tableName); } + /** + * Drop an existing table + * @param tableName existing table + */ + public void deleteTableIfAny(TableName tableName) throws IOException { + try { + deleteTable(tableName); + } catch (TableNotFoundException e) { + // ignore + } + } + // ========================================================================== // Canned table and table descriptor creation // TODO replace HBaseTestCase @@ -2085,7 +2099,8 @@ public class HBaseTestingUtility extends HBaseCommonTestingUtility { return rowCount; } - public void loadNumericRows(final Table t, final byte[] f, int startRow, int endRow) throws IOException { + public void loadNumericRows(final Table t, final byte[] f, int startRow, int endRow) + throws IOException { for (int i = startRow; i < endRow; i++) { byte[] data = Bytes.toBytes(String.valueOf(i)); Put put = new Put(data); @@ -2094,7 +2109,23 @@ public class HBaseTestingUtility extends HBaseCommonTestingUtility { } } - public void deleteNumericRows(final Table t, final byte[] f, int startRow, int endRow) throws IOException { + public void verifyNumericRows(HRegion region, final byte[] f, int startRow, int endRow) + throws IOException { + for (int i = startRow; i < endRow; i++) { + String failMsg = "Failed verification of row :" + i; + byte[] data = Bytes.toBytes(String.valueOf(i)); + Result result = region.get(new Get(data)); + assertTrue(failMsg, result.containsColumn(f, null)); + assertEquals(failMsg, result.getColumnCells(f, null).size(), 1); + Cell cell = result.getColumnLatestCell(f, null); + assertTrue(failMsg, + Bytes.equals(data, 0, data.length, cell.getValueArray(), cell.getValueOffset(), + cell.getValueLength())); + } + } + + public void deleteNumericRows(final HTable t, final byte[] f, int startRow, int endRow) + throws IOException { for (int i = startRow; i < endRow; i++) { byte[] data = Bytes.toBytes(String.valueOf(i)); Delete delete = new Delete(data); diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestRegionReplicas.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestRegionReplicas.java index afeec4dcdf1..9220a49ccc4 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestRegionReplicas.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestRegionReplicas.java @@ -18,6 +18,7 @@ package org.apache.hadoop.hbase.regionserver; +import static org.apache.hadoop.hbase.regionserver.TestRegionServerNoMaster.*; import java.io.IOException; import java.util.Random; import java.util.concurrent.ExecutorService; @@ -33,7 +34,6 @@ import org.apache.hadoop.hbase.HBaseTestingUtility; import org.apache.hadoop.hbase.HConstants; import org.apache.hadoop.hbase.HRegionInfo; import org.apache.hadoop.hbase.testclassification.MediumTests; -import org.apache.hadoop.hbase.NotServingRegionException; import org.apache.hadoop.hbase.TableName; import org.apache.hadoop.hbase.TestMetaTableAccessor; import org.apache.hadoop.hbase.client.Consistency; @@ -45,7 +45,6 @@ import org.apache.hadoop.hbase.client.Table; import org.apache.hadoop.hbase.io.hfile.HFileScanner; import org.apache.hadoop.hbase.protobuf.ProtobufUtil; import org.apache.hadoop.hbase.protobuf.RequestConverter; -import org.apache.hadoop.hbase.protobuf.generated.AdminProtos; import org.apache.hadoop.hbase.protobuf.generated.ClientProtos; import org.apache.hadoop.hbase.util.Bytes; import org.apache.hadoop.hbase.util.Threads; @@ -121,61 +120,9 @@ public class TestRegionReplicas { return HTU.getMiniHBaseCluster().getRegionServer(0); } - private void openRegion(HRegionInfo hri) throws Exception { - ZKAssign.createNodeOffline(HTU.getZooKeeperWatcher(), hri, getRS().getServerName()); - // first version is '0' - AdminProtos.OpenRegionRequest orr = RequestConverter.buildOpenRegionRequest(getRS().getServerName(), hri, 0, null, null); - AdminProtos.OpenRegionResponse responseOpen = getRS().getRSRpcServices().openRegion(null, orr); - Assert.assertTrue(responseOpen.getOpeningStateCount() == 1); - Assert.assertTrue(responseOpen.getOpeningState(0). - equals(AdminProtos.OpenRegionResponse.RegionOpeningState.OPENED)); - checkRegionIsOpened(hri.getEncodedName()); - } - - private void closeRegion(HRegionInfo hri) throws Exception { - ZKAssign.createNodeClosing(HTU.getZooKeeperWatcher(), hri, getRS().getServerName()); - - AdminProtos.CloseRegionRequest crr = RequestConverter.buildCloseRegionRequest(getRS().getServerName(), - hri.getEncodedName(), true); - AdminProtos.CloseRegionResponse responseClose = getRS().getRSRpcServices().closeRegion(null, crr); - Assert.assertTrue(responseClose.getClosed()); - - checkRegionIsClosed(hri.getEncodedName()); - - ZKAssign.deleteClosedNode(HTU.getZooKeeperWatcher(), hri.getEncodedName(), getRS().getServerName()); - } - - private void checkRegionIsOpened(String encodedRegionName) throws Exception { - - while (!getRS().getRegionsInTransitionInRS().isEmpty()) { - Thread.sleep(1); - } - - Assert.assertTrue(getRS().getRegionByEncodedName(encodedRegionName).isAvailable()); - - Assert.assertTrue( - ZKAssign.deleteOpenedNode(HTU.getZooKeeperWatcher(), encodedRegionName, getRS().getServerName())); - } - - - private void checkRegionIsClosed(String encodedRegionName) throws Exception { - - while (!getRS().getRegionsInTransitionInRS().isEmpty()) { - Thread.sleep(1); - } - - try { - Assert.assertFalse(getRS().getRegionByEncodedName(encodedRegionName).isAvailable()); - } catch (NotServingRegionException expected) { - // That's how it work: if the region is closed we have an exception. - } - - // We don't delete the znode here, because there is not always a znode. - } - @Test(timeout = 60000) public void testOpenRegionReplica() throws Exception { - openRegion(hriSecondary); + openRegion(HTU, getRS(), hriSecondary); try { //load some data to primary HTU.loadNumericRows(table, f, 0, 1000); @@ -184,22 +131,22 @@ public class TestRegionReplicas { Assert.assertEquals(1000, HTU.countRows(table)); } finally { HTU.deleteNumericRows(table, f, 0, 1000); - closeRegion(hriSecondary); + closeRegion(HTU, getRS(), hriSecondary); } } /** Tests that the meta location is saved for secondary regions */ @Test(timeout = 60000) public void testRegionReplicaUpdatesMetaLocation() throws Exception { - openRegion(hriSecondary); + openRegion(HTU, getRS(), hriSecondary); Table meta = null; try { - meta = new HTable(HTU.getConfiguration(), TableName.META_TABLE_NAME); + meta = HTU.getConnection().getTable(TableName.META_TABLE_NAME); TestMetaTableAccessor.assertMetaLocation(meta, hriPrimary.getRegionName() , getRS().getServerName(), -1, 1, false); } finally { if (meta != null ) meta.close(); - closeRegion(hriSecondary); + closeRegion(HTU, getRS(), hriSecondary); } } @@ -213,7 +160,7 @@ public class TestRegionReplicas { // flush so that region replica can read getRS().getRegionByEncodedName(hriPrimary.getEncodedName()).flushcache(); - openRegion(hriSecondary); + openRegion(HTU, getRS(), hriSecondary); // first try directly against region HRegion region = getRS().getFromOnlineRegions(hriSecondary.getEncodedName()); @@ -222,7 +169,7 @@ public class TestRegionReplicas { assertGetRpc(hriSecondary, 42, true); } finally { HTU.deleteNumericRows(table, HConstants.CATALOG_FAMILY, 0, 1000); - closeRegion(hriSecondary); + closeRegion(HTU, getRS(), hriSecondary); } } @@ -236,7 +183,7 @@ public class TestRegionReplicas { // flush so that region replica can read getRS().getRegionByEncodedName(hriPrimary.getEncodedName()).flushcache(); - openRegion(hriSecondary); + openRegion(HTU, getRS(), hriSecondary); // try directly Get against region replica byte[] row = Bytes.toBytes(String.valueOf(42)); @@ -247,7 +194,7 @@ public class TestRegionReplicas { Assert.assertArrayEquals(row, result.getValue(f, null)); } finally { HTU.deleteNumericRows(table, HConstants.CATALOG_FAMILY, 0, 1000); - closeRegion(hriSecondary); + closeRegion(HTU, getRS(), hriSecondary); } } @@ -263,7 +210,8 @@ public class TestRegionReplicas { } // build a mock rpc - private void assertGetRpc(HRegionInfo info, int value, boolean expect) throws IOException, ServiceException { + private void assertGetRpc(HRegionInfo info, int value, boolean expect) + throws IOException, ServiceException { byte[] row = Bytes.toBytes(String.valueOf(value)); Get get = new Get(row); ClientProtos.GetRequest getReq = RequestConverter.buildGetRequest(info.getRegionName(), get); @@ -286,13 +234,14 @@ public class TestRegionReplicas { // enable store file refreshing final int refreshPeriod = 2000; // 2 sec HTU.getConfiguration().setInt("hbase.hstore.compactionThreshold", 100); - HTU.getConfiguration().setInt(StorefileRefresherChore.REGIONSERVER_STOREFILE_REFRESH_PERIOD, refreshPeriod); + HTU.getConfiguration().setInt(StorefileRefresherChore.REGIONSERVER_STOREFILE_REFRESH_PERIOD, + refreshPeriod); // restart the region server so that it starts the refresher chore restartRegionServer(); try { LOG.info("Opening the secondary region " + hriSecondary.getEncodedName()); - openRegion(hriSecondary); + openRegion(HTU, getRS(), hriSecondary); //load some data to primary LOG.info("Loading data to primary region"); @@ -348,7 +297,7 @@ public class TestRegionReplicas { } finally { HTU.deleteNumericRows(table, HConstants.CATALOG_FAMILY, 0, 1000); - closeRegion(hriSecondary); + closeRegion(HTU, getRS(), hriSecondary); } } @@ -365,7 +314,7 @@ public class TestRegionReplicas { final int startKey = 0, endKey = 1000; try { - openRegion(hriSecondary); + openRegion(HTU, getRS(), hriSecondary); //load some data to primary so that reader won't fail HTU.loadNumericRows(table, f, startKey, endKey); @@ -429,13 +378,13 @@ public class TestRegionReplicas { // whether to do a close and open if (random.nextInt(10) == 0) { try { - closeRegion(hriSecondary); + closeRegion(HTU, getRS(), hriSecondary); } catch (Exception ex) { LOG.warn("Failed closing the region " + hriSecondary + " " + StringUtils.stringifyException(ex)); exceptions[2].compareAndSet(null, ex); } try { - openRegion(hriSecondary); + openRegion(HTU, getRS(), hriSecondary); } catch (Exception ex) { LOG.warn("Failed opening the region " + hriSecondary + " " + StringUtils.stringifyException(ex)); exceptions[2].compareAndSet(null, ex); @@ -469,7 +418,7 @@ public class TestRegionReplicas { } } finally { HTU.deleteNumericRows(table, HConstants.CATALOG_FAMILY, startKey, endKey); - closeRegion(hriSecondary); + closeRegion(HTU, getRS(), hriSecondary); } } @@ -481,7 +430,7 @@ public class TestRegionReplicas { try { LOG.info("Opening the secondary region " + hriSecondary.getEncodedName()); - openRegion(hriSecondary); + openRegion(HTU, getRS(), hriSecondary); // load some data to primary LOG.info("Loading data to primary region"); @@ -528,7 +477,7 @@ public class TestRegionReplicas { Assert.assertEquals(4498500, sum); } finally { HTU.deleteNumericRows(table, HConstants.CATALOG_FAMILY, 0, 1000); - closeRegion(hriSecondary); + closeRegion(HTU, getRS(), hriSecondary); } } } diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestRegionServerNoMaster.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestRegionServerNoMaster.java index 381feb7c438..11fa9384454 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestRegionServerNoMaster.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestRegionServerNoMaster.java @@ -133,59 +133,69 @@ public class TestRegionServerNoMaster { return HTU.getHBaseCluster().getLiveRegionServerThreads().get(0).getRegionServer(); } - - /** - * Reopen the region. Reused in multiple tests as we always leave the region open after a test. - */ - private void reopenRegion() throws Exception { - // We reopen. We need a ZK node here, as a open is always triggered by a master. - ZKAssign.createNodeOffline(HTU.getZooKeeperWatcher(), hri, getRS().getServerName()); + public static void openRegion(HBaseTestingUtility HTU, HRegionServer rs, HRegionInfo hri) + throws Exception { + ZKAssign.createNodeOffline(HTU.getZooKeeperWatcher(), hri, rs.getServerName()); // first version is '0' AdminProtos.OpenRegionRequest orr = - RequestConverter.buildOpenRegionRequest(getRS().getServerName(), hri, 0, null, null); - AdminProtos.OpenRegionResponse responseOpen = getRS().rpcServices.openRegion(null, orr); + RequestConverter.buildOpenRegionRequest(rs.getServerName(), hri, 0, null, null); + AdminProtos.OpenRegionResponse responseOpen = rs.rpcServices.openRegion(null, orr); + Assert.assertTrue(responseOpen.getOpeningStateCount() == 1); Assert.assertTrue(responseOpen.getOpeningState(0). equals(AdminProtos.OpenRegionResponse.RegionOpeningState.OPENED)); - checkRegionIsOpened(); + checkRegionIsOpened(HTU, rs, hri); } - private void checkRegionIsOpened() throws Exception { - - while (!getRS().getRegionsInTransitionInRS().isEmpty()) { + public static void checkRegionIsOpened(HBaseTestingUtility HTU, HRegionServer rs, + HRegionInfo hri) throws Exception { + while (!rs.getRegionsInTransitionInRS().isEmpty()) { Thread.sleep(1); } - Assert.assertTrue(getRS().getRegion(regionName).isAvailable()); + Assert.assertTrue(rs.getRegion(hri.getRegionName()).isAvailable()); Assert.assertTrue( ZKAssign.deleteOpenedNode(HTU.getZooKeeperWatcher(), hri.getEncodedName(), - getRS().getServerName())); + rs.getServerName())); } + public static void closeRegion(HBaseTestingUtility HTU, HRegionServer rs, HRegionInfo hri) + throws Exception { + ZKAssign.createNodeClosing(HTU.getZooKeeperWatcher(), hri, rs.getServerName()); + AdminProtos.CloseRegionRequest crr = RequestConverter.buildCloseRegionRequest( + rs.getServerName(), hri.getEncodedName(), true); + AdminProtos.CloseRegionResponse responseClose = rs.rpcServices.closeRegion(null, crr); + Assert.assertTrue(responseClose.getClosed()); + checkRegionIsClosed(HTU, rs, hri); + ZKAssign.deleteClosedNode(HTU.getZooKeeperWatcher(), hri.getEncodedName(), null); + } - private void checkRegionIsClosed() throws Exception { - - while (!getRS().getRegionsInTransitionInRS().isEmpty()) { + public static void checkRegionIsClosed(HBaseTestingUtility HTU, HRegionServer rs, + HRegionInfo hri) throws Exception { + while (!rs.getRegionsInTransitionInRS().isEmpty()) { Thread.sleep(1); } + boolean exception = false; try { - Assert.assertFalse(getRS().getRegion(regionName).isAvailable()); + while ((rs.getRegion(hri.getRegionName()).isAvailable())) { + Thread.sleep(10); + } } catch (NotServingRegionException expected) { + exception = true; // That's how it work: if the region is closed we have an exception. } - + assert(exception); // We don't delete the znode here, because there is not always a znode. } - /** * Close the region without using ZK */ - private void closeNoZK() throws Exception { + private void closeRegionNoZK() throws Exception { // no transition in ZK AdminProtos.CloseRegionRequest crr = RequestConverter.buildCloseRegionRequest(getRS().getServerName(), regionName, false); @@ -193,14 +203,14 @@ public class TestRegionServerNoMaster { Assert.assertTrue(responseClose.getClosed()); // now waiting & checking. After a while, the transition should be done and the region closed - checkRegionIsClosed(); + checkRegionIsClosed(HTU, getRS(), hri); } @Test(timeout = 60000) public void testCloseByRegionServer() throws Exception { - closeNoZK(); - reopenRegion(); + closeRegionNoZK(); + openRegion(HTU, getRS(), hri); } @Test(timeout = 60000) @@ -231,12 +241,12 @@ public class TestRegionServerNoMaster { AdminProtos.CloseRegionResponse responseClose = getRS().rpcServices.closeRegion(null, crr); Assert.assertTrue(responseClose.getClosed()); - checkRegionIsClosed(); + checkRegionIsClosed(HTU, getRS(), hri); ZKAssign.deleteClosedNode(HTU.getZooKeeperWatcher(), hri.getEncodedName(), getRS().getServerName()); - reopenRegion(); + openRegion(HTU, getRS(), hri); } /** @@ -253,8 +263,8 @@ public class TestRegionServerNoMaster { public void testMultipleOpen() throws Exception { // We close - closeNoZK(); - checkRegionIsClosed(); + closeRegionNoZK(); + checkRegionIsClosed(HTU, getRS(), hri); // We reopen. We need a ZK node here, as a open is always triggered by a master. ZKAssign.createNodeOffline(HTU.getZooKeeperWatcher(), hri, getRS().getServerName()); @@ -273,7 +283,7 @@ public class TestRegionServerNoMaster { ); } - checkRegionIsOpened(); + checkRegionIsOpened(HTU, getRS(), hri); } @Test @@ -317,14 +327,14 @@ public class TestRegionServerNoMaster { } } - checkRegionIsClosed(); + checkRegionIsClosed(HTU, getRS(), hri); Assert.assertTrue( ZKAssign.deleteClosedNode(HTU.getZooKeeperWatcher(), hri.getEncodedName(), getRS().getServerName()) ); - reopenRegion(); + openRegion(HTU, getRS(), hri); } /** @@ -333,8 +343,8 @@ public class TestRegionServerNoMaster { @Test(timeout = 60000) public void testCancelOpeningWithoutZK() throws Exception { // We close - closeNoZK(); - checkRegionIsClosed(); + closeRegionNoZK(); + checkRegionIsClosed(HTU, getRS(), hri); // Let do the initial steps, without having a handler ZKAssign.createNodeOffline(HTU.getZooKeeperWatcher(), hri, getRS().getServerName()); @@ -369,7 +379,7 @@ public class TestRegionServerNoMaster { csm.getOpenRegionCoordination(), zkCrd)); // The open handler should have removed the region from RIT but kept the region closed - checkRegionIsClosed(); + checkRegionIsClosed(HTU, getRS(), hri); // The open handler should have updated the value in ZK. Assert.assertTrue(ZKAssign.deleteNode( @@ -377,7 +387,7 @@ public class TestRegionServerNoMaster { EventType.RS_ZK_REGION_FAILED_OPEN, 1) ); - reopenRegion(); + openRegion(HTU, getRS(), hri); } /** @@ -387,8 +397,8 @@ public class TestRegionServerNoMaster { @Test(timeout = 60000) public void testCancelOpeningWithZK() throws Exception { // We close - closeNoZK(); - checkRegionIsClosed(); + closeRegionNoZK(); + checkRegionIsClosed(HTU, getRS(), hri); // Let do the initial steps, without having a handler getRS().getRegionsInTransitionInRS().put(hri.getEncodedNameAsBytes(), Boolean.TRUE); @@ -434,12 +444,12 @@ public class TestRegionServerNoMaster { csm.getOpenRegionCoordination(), zkCrd)); // The open handler should have removed the region from RIT but kept the region closed - checkRegionIsClosed(); + checkRegionIsClosed(HTU, getRS(), hri); // We should not find any znode here. Assert.assertEquals(-1, ZKAssign.getVersion(HTU.getZooKeeperWatcher(), hri)); - reopenRegion(); + openRegion(HTU, getRS(), hri); } /** @@ -463,7 +473,7 @@ public class TestRegionServerNoMaster { } //actual close - closeNoZK(); + closeRegionNoZK(); try { AdminProtos.OpenRegionRequest orr = RequestConverter.buildOpenRegionRequest( earlierServerName, hri, 0, null, null); @@ -473,7 +483,7 @@ public class TestRegionServerNoMaster { Assert.assertTrue(se.getCause() instanceof IOException); Assert.assertTrue(se.getCause().getMessage().contains("This RPC was intended for a different server")); } finally { - reopenRegion(); + openRegion(HTU, getRS(), hri); } } } diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestRegionReplicaReplicationEndpoint.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestRegionReplicaReplicationEndpoint.java new file mode 100644 index 00000000000..76945d7e1b8 --- /dev/null +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestRegionReplicaReplicationEndpoint.java @@ -0,0 +1,345 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.hbase.replication.regionserver; + +import static org.junit.Assert.*; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.when; + +import java.io.IOException; +import java.util.List; +import java.util.concurrent.Executors; +import java.util.concurrent.atomic.AtomicLong; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.commons.logging.impl.Log4JLogger; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hbase.HBaseTestingUtility; +import org.apache.hadoop.hbase.HConstants; +import org.apache.hadoop.hbase.HRegionLocation; +import org.apache.hadoop.hbase.HTableDescriptor; +import org.apache.hadoop.hbase.testclassification.MediumTests; +import org.apache.hadoop.hbase.TableName; +import org.apache.hadoop.hbase.Waiter; +import org.apache.hadoop.hbase.client.ClusterConnection; +import org.apache.hadoop.hbase.client.HConnection; +import org.apache.hadoop.hbase.client.HConnectionManager; +import org.apache.hadoop.hbase.client.HTableInterface; +import org.apache.hadoop.hbase.client.RpcRetryingCaller; +import org.apache.hadoop.hbase.client.replication.ReplicationAdmin; +import org.apache.hadoop.hbase.regionserver.HRegion; +import org.apache.hadoop.hbase.regionserver.HRegionServer; +import org.apache.hadoop.hbase.wal.WAL.Entry; +import org.apache.hadoop.hbase.wal.WALKey; +import org.apache.hadoop.hbase.regionserver.wal.WALEdit; +import org.apache.hadoop.hbase.replication.ReplicationException; +import org.apache.hadoop.hbase.replication.ReplicationPeerConfig; +import org.apache.hadoop.hbase.util.ServerRegionReplicaUtil; +import org.apache.hadoop.hbase.zookeeper.ZKUtil; +import org.apache.log4j.Level; +import org.junit.AfterClass; +import org.junit.BeforeClass; +import org.junit.Test; +import org.junit.experimental.categories.Category; + +import com.google.common.collect.Lists; + +/** + * Tests RegionReplicaReplicationEndpoint class by setting up region replicas and verifying + * async wal replication replays the edits to the secondary region in various scenarios. + */ +@Category(MediumTests.class) +public class TestRegionReplicaReplicationEndpoint { + + private static final Log LOG = LogFactory.getLog(TestRegionReplicaReplicationEndpoint.class); + + static { + ((Log4JLogger)RpcRetryingCaller.LOG).getLogger().setLevel(Level.ALL); + } + + private static final int NB_SERVERS = 2; + + private static final HBaseTestingUtility HTU = new HBaseTestingUtility(); + + @BeforeClass + public static void beforeClass() throws Exception { + Configuration conf = HTU.getConfiguration(); + conf.setFloat("hbase.regionserver.logroll.multiplier", 0.0003f); + conf.setInt("replication.source.size.capacity", 10240); + conf.setLong("replication.source.sleepforretries", 100); + conf.setInt("hbase.regionserver.maxlogs", 10); + conf.setLong("hbase.master.logcleaner.ttl", 10); + conf.setInt("zookeeper.recovery.retry", 1); + conf.setInt("zookeeper.recovery.retry.intervalmill", 10); + conf.setBoolean(HConstants.REPLICATION_ENABLE_KEY, true); + conf.setBoolean(ServerRegionReplicaUtil.REGION_REPLICA_REPLICATION_CONF_KEY, true); + conf.setLong(HConstants.THREAD_WAKE_FREQUENCY, 100); + conf.setInt("replication.stats.thread.period.seconds", 5); + conf.setBoolean("hbase.tests.use.shortcircuit.reads", false); + conf.setInt(HConstants.HBASE_CLIENT_RETRIES_NUMBER, 3); // less number of retries is needed + conf.setInt("hbase.client.serverside.retries.multiplier", 1); + + HTU.startMiniCluster(NB_SERVERS); + } + + @AfterClass + public static void afterClass() throws Exception { + HTU.shutdownMiniCluster(); + } + + @Test + public void testRegionReplicaReplicationPeerIsCreated() throws IOException, ReplicationException { + // create a table with region replicas. Check whether the replication peer is created + // and replication started. + ReplicationAdmin admin = new ReplicationAdmin(HTU.getConfiguration()); + String peerId = "region_replica_replication"; + + if (admin.getPeerConfig(peerId) != null) { + admin.removePeer(peerId); + } + + HTableDescriptor htd = HTU.createTableDescriptor( + "testReplicationPeerIsCreated_no_region_replicas"); + HTU.getHBaseAdmin().createTable(htd); + ReplicationPeerConfig peerConfig = admin.getPeerConfig(peerId); + assertNull(peerConfig); + + htd = HTU.createTableDescriptor("testReplicationPeerIsCreated"); + htd.setRegionReplication(2); + HTU.getHBaseAdmin().createTable(htd); + + // assert peer configuration is correct + peerConfig = admin.getPeerConfig(peerId); + assertNotNull(peerConfig); + assertEquals(peerConfig.getClusterKey(), ZKUtil.getZooKeeperClusterKey(HTU.getConfiguration())); + assertEquals(peerConfig.getReplicationEndpointImpl(), + RegionReplicaReplicationEndpoint.class.getName()); + admin.close(); + } + + + public void testRegionReplicaReplication(int regionReplication) throws Exception { + // test region replica replication. Create a table with single region, write some data + // ensure that data is replicated to the secondary region + TableName tableName = TableName.valueOf("testRegionReplicaReplicationWithReplicas_" + + regionReplication); + HTableDescriptor htd = HTU.createTableDescriptor(tableName.toString()); + htd.setRegionReplication(regionReplication); + HTU.getHBaseAdmin().createTable(htd); + TableName tableNameNoReplicas = + TableName.valueOf("testRegionReplicaReplicationWithReplicas_NO_REPLICAS"); + HTU.deleteTableIfAny(tableNameNoReplicas); + HTU.createTable(tableNameNoReplicas, HBaseTestingUtility.fam1); + + HConnection connection = HConnectionManager.createConnection(HTU.getConfiguration()); + HTableInterface table = connection.getTable(tableName); + HTableInterface tableNoReplicas = connection.getTable(tableNameNoReplicas); + + try { + // load some data to the non-replicated table + HTU.loadNumericRows(tableNoReplicas, HBaseTestingUtility.fam1, 6000, 7000); + + // load the data to the table + HTU.loadNumericRows(table, HBaseTestingUtility.fam1, 0, 1000); + + verifyReplication(tableName, regionReplication, 0, 1000); + + } finally { + table.close(); + tableNoReplicas.close(); + HTU.deleteTableIfAny(tableNameNoReplicas); + connection.close(); + } + } + + private void verifyReplication(TableName tableName, int regionReplication, + final int startRow, final int endRow) throws Exception { + // find the regions + final HRegion[] regions = new HRegion[regionReplication]; + + for (int i=0; i < NB_SERVERS; i++) { + HRegionServer rs = HTU.getMiniHBaseCluster().getRegionServer(i); + List onlineRegions = rs.getOnlineRegions(tableName); + for (HRegion region : onlineRegions) { + regions[region.getRegionInfo().getReplicaId()] = region; + } + } + + for (HRegion region : regions) { + assertNotNull(region); + } + + for (int i = 1; i < regionReplication; i++) { + final HRegion region = regions[i]; + // wait until all the data is replicated to all secondary regions + Waiter.waitFor(HTU.getConfiguration(), 60000, new Waiter.Predicate() { + @Override + public boolean evaluate() throws Exception { + LOG.info("verifying replication for region replica:" + region.getRegionInfo()); + try { + HTU.verifyNumericRows(region, HBaseTestingUtility.fam1, startRow, endRow); + } catch(Throwable ex) { + LOG.warn("Verification from secondary region is not complete yet. Got:" + ex + + " " + ex.getMessage()); + // still wait + return false; + } + return true; + } + }); + } + } + + @Test(timeout = 60000) + public void testRegionReplicaReplicationWith2Replicas() throws Exception { + testRegionReplicaReplication(2); + } + + @Test(timeout = 60000) + public void testRegionReplicaReplicationWith3Replicas() throws Exception { + testRegionReplicaReplication(3); + } + + @Test(timeout = 60000) + public void testRegionReplicaReplicationWith10Replicas() throws Exception { + testRegionReplicaReplication(10); + } + + @Test (timeout = 60000) + public void testRegionReplicaReplicationForFlushAndCompaction() throws Exception { + // Tests a table with region replication 3. Writes some data, and causes flushes and + // compactions. Verifies that the data is readable from the replicas. Note that this + // does not test whether the replicas actually pick up flushed files and apply compaction + // to their stores + int regionReplication = 3; + TableName tableName = TableName.valueOf("testRegionReplicaReplicationForFlushAndCompaction"); + HTableDescriptor htd = HTU.createTableDescriptor(tableName.toString()); + htd.setRegionReplication(regionReplication); + HTU.getHBaseAdmin().createTable(htd); + + + HConnection connection = HConnectionManager.createConnection(HTU.getConfiguration()); + HTableInterface table = connection.getTable(tableName); + + try { + // load the data to the table + + for (int i = 0; i < 6000; i += 1000) { + LOG.info("Writing data from " + i + " to " + (i+1000)); + HTU.loadNumericRows(table, HBaseTestingUtility.fam1, i, i+1000); + LOG.info("flushing table"); + HTU.flush(tableName); + LOG.info("compacting table"); + HTU.compact(tableName, false); + } + + verifyReplication(tableName, regionReplication, 0, 6000); + } finally { + table.close(); + connection.close(); + } + } + + @Test (timeout = 60000) + public void testRegionReplicaReplicationIgnoresDisabledTables() throws Exception { + testRegionReplicaReplicationIgnoresDisabledTables(false); + } + + @Test (timeout = 60000) + public void testRegionReplicaReplicationIgnoresDroppedTables() throws Exception { + testRegionReplicaReplicationIgnoresDisabledTables(true); + } + + public void testRegionReplicaReplicationIgnoresDisabledTables(boolean dropTable) + throws Exception { + // tests having edits from a disabled or dropped table is handled correctly by skipping those + // entries and further edits after the edits from dropped/disabled table can be replicated + // without problems. + TableName tableName = TableName.valueOf("testRegionReplicaReplicationIgnoresDisabledTables" + + dropTable); + HTableDescriptor htd = HTU.createTableDescriptor(tableName.toString()); + int regionReplication = 3; + htd.setRegionReplication(regionReplication); + HTU.deleteTableIfAny(tableName); + HTU.getHBaseAdmin().createTable(htd); + TableName toBeDisabledTable = TableName.valueOf(dropTable ? "droppedTable" : "disabledTable"); + HTU.deleteTableIfAny(toBeDisabledTable); + htd = HTU.createTableDescriptor(toBeDisabledTable.toString()); + htd.setRegionReplication(regionReplication); + HTU.getHBaseAdmin().createTable(htd); + + // both tables are created, now pause replication + ReplicationAdmin admin = new ReplicationAdmin(HTU.getConfiguration()); + admin.disablePeer(ServerRegionReplicaUtil.getReplicationPeerId()); + + // now that the replication is disabled, write to the table to be dropped, then drop the table. + + HConnection connection = HConnectionManager.createConnection(HTU.getConfiguration()); + HTableInterface table = connection.getTable(tableName); + HTableInterface tableToBeDisabled = connection.getTable(toBeDisabledTable); + + HTU.loadNumericRows(tableToBeDisabled, HBaseTestingUtility.fam1, 6000, 7000); + + AtomicLong skippedEdits = new AtomicLong(); + RegionReplicaReplicationEndpoint.RegionReplicaOutputSink sink = + mock(RegionReplicaReplicationEndpoint.RegionReplicaOutputSink.class); + when(sink.getSkippedEditsCounter()).thenReturn(skippedEdits); + RegionReplicaReplicationEndpoint.RegionReplicaSinkWriter sinkWriter = + new RegionReplicaReplicationEndpoint.RegionReplicaSinkWriter(sink, + (ClusterConnection) connection, + Executors.newSingleThreadExecutor(), 1000); + + HRegionLocation hrl = connection.locateRegion(toBeDisabledTable, HConstants.EMPTY_BYTE_ARRAY); + byte[] encodedRegionName = hrl.getRegionInfo().getEncodedNameAsBytes(); + + Entry entry = new Entry( + new WALKey(encodedRegionName, toBeDisabledTable, 1), + new WALEdit()); + + HTU.getHBaseAdmin().disableTable(toBeDisabledTable); // disable the table + if (dropTable) { + HTU.getHBaseAdmin().deleteTable(toBeDisabledTable); + } + + sinkWriter.append(toBeDisabledTable, encodedRegionName, + HConstants.EMPTY_BYTE_ARRAY, Lists.newArrayList(entry, entry)); + + assertEquals(2, skippedEdits.get()); + + try { + // load some data to the to-be-dropped table + + // load the data to the table + HTU.loadNumericRows(table, HBaseTestingUtility.fam1, 0, 1000); + + // now enable the replication + admin.enablePeer(ServerRegionReplicaUtil.getReplicationPeerId()); + + verifyReplication(tableName, regionReplication, 0, 1000); + + } finally { + admin.close(); + table.close(); + tableToBeDisabled.close(); + HTU.deleteTableIfAny(toBeDisabledTable); + connection.close(); + } + } +} diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestRegionReplicaReplicationEndpointNoMaster.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestRegionReplicaReplicationEndpointNoMaster.java new file mode 100644 index 00000000000..8ecfd768869 --- /dev/null +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestRegionReplicaReplicationEndpointNoMaster.java @@ -0,0 +1,265 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.hbase.replication.regionserver; + +import static org.apache.hadoop.hbase.regionserver.TestRegionServerNoMaster.closeRegion; +import static org.apache.hadoop.hbase.regionserver.TestRegionServerNoMaster.openRegion; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.when; + +import java.io.IOException; +import java.util.Queue; +import java.util.concurrent.ConcurrentLinkedQueue; +import java.util.concurrent.atomic.AtomicLong; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hbase.HBaseTestingUtility; +import org.apache.hadoop.hbase.HConstants; +import org.apache.hadoop.hbase.HRegionInfo; +import org.apache.hadoop.hbase.HTableDescriptor; +import org.apache.hadoop.hbase.testclassification.MediumTests; +import org.apache.hadoop.hbase.RegionLocations; +import org.apache.hadoop.hbase.TableName; +import org.apache.hadoop.hbase.client.ClusterConnection; +import org.apache.hadoop.hbase.client.HConnectionManager; +import org.apache.hadoop.hbase.client.HTable; +import org.apache.hadoop.hbase.client.RpcRetryingCallerFactory; +import org.apache.hadoop.hbase.coprocessor.BaseWALObserver; +import org.apache.hadoop.hbase.coprocessor.CoprocessorHost; +import org.apache.hadoop.hbase.coprocessor.ObserverContext; +import org.apache.hadoop.hbase.coprocessor.WALCoprocessorEnvironment; +import org.apache.hadoop.hbase.ipc.RpcControllerFactory; +import org.apache.hadoop.hbase.protobuf.generated.AdminProtos.ReplicateWALEntryResponse; +import org.apache.hadoop.hbase.regionserver.HRegion; +import org.apache.hadoop.hbase.regionserver.HRegionServer; +import org.apache.hadoop.hbase.regionserver.TestRegionServerNoMaster; +import org.apache.hadoop.hbase.wal.WAL; +import org.apache.hadoop.hbase.wal.WAL.Entry; +import org.apache.hadoop.hbase.wal.WALKey; +import org.apache.hadoop.hbase.regionserver.wal.WALEdit; +import org.apache.hadoop.hbase.replication.ReplicationEndpoint; +import org.apache.hadoop.hbase.replication.ReplicationEndpoint.ReplicateContext; +import org.apache.hadoop.hbase.replication.regionserver.RegionReplicaReplicationEndpoint.RegionReplicaReplayCallable; +import org.apache.hadoop.hbase.util.ServerRegionReplicaUtil; +import org.junit.After; +import org.junit.AfterClass; +import org.junit.Assert; +import org.junit.Before; +import org.junit.BeforeClass; +import org.junit.Test; +import org.junit.experimental.categories.Category; + +import com.google.common.collect.Lists; + +/** + * Tests RegionReplicaReplicationEndpoint. Unlike TestRegionReplicaReplicationEndpoint this + * class contains lower level tests using callables. + */ +@Category(MediumTests.class) +public class TestRegionReplicaReplicationEndpointNoMaster { + + private static final Log LOG = LogFactory.getLog( + TestRegionReplicaReplicationEndpointNoMaster.class); + + private static final int NB_SERVERS = 2; + private static TableName tableName = TableName.valueOf( + TestRegionReplicaReplicationEndpointNoMaster.class.getSimpleName()); + private static HTable table; + private static final byte[] row = "TestRegionReplicaReplicator".getBytes(); + + private static HRegionServer rs0; + private static HRegionServer rs1; + + private static HRegionInfo hriPrimary; + private static HRegionInfo hriSecondary; + + private static final HBaseTestingUtility HTU = new HBaseTestingUtility(); + private static final byte[] f = HConstants.CATALOG_FAMILY; + + @BeforeClass + public static void beforeClass() throws Exception { + Configuration conf = HTU.getConfiguration(); + conf.setBoolean(HConstants.REPLICATION_ENABLE_KEY, true); + conf.setBoolean(ServerRegionReplicaUtil.REGION_REPLICA_REPLICATION_CONF_KEY, true); + + // install WALObserver coprocessor for tests + String walCoprocs = HTU.getConfiguration().get(CoprocessorHost.WAL_COPROCESSOR_CONF_KEY); + if (walCoprocs == null) { + walCoprocs = WALEditCopro.class.getName(); + } else { + walCoprocs += "," + WALEditCopro.class.getName(); + } + HTU.getConfiguration().set(CoprocessorHost.WAL_COPROCESSOR_CONF_KEY, + walCoprocs); + HTU.startMiniCluster(NB_SERVERS); + + // Create table then get the single region for our new table. + HTableDescriptor htd = HTU.createTableDescriptor(tableName.toString()); + table = HTU.createTable(htd, new byte[][]{f}, HTU.getConfiguration()); + + hriPrimary = table.getRegionLocation(row, false).getRegionInfo(); + + // mock a secondary region info to open + hriSecondary = new HRegionInfo(hriPrimary.getTable(), hriPrimary.getStartKey(), + hriPrimary.getEndKey(), hriPrimary.isSplit(), hriPrimary.getRegionId(), 1); + + // No master + TestRegionServerNoMaster.stopMasterAndAssignMeta(HTU); + rs0 = HTU.getMiniHBaseCluster().getRegionServer(0); + rs1 = HTU.getMiniHBaseCluster().getRegionServer(1); + } + + @AfterClass + public static void afterClass() throws Exception { + table.close(); + HTU.shutdownMiniCluster(); + } + + @Before + public void before() throws Exception{ + entries.clear(); + } + + @After + public void after() throws Exception { + } + + static ConcurrentLinkedQueue entries = new ConcurrentLinkedQueue(); + + public static class WALEditCopro extends BaseWALObserver { + public WALEditCopro() { + entries.clear(); + } + @Override + public void postWALWrite(ObserverContext ctx, + HRegionInfo info, WALKey logKey, WALEdit logEdit) throws IOException { + // only keep primary region's edits + if (logKey.getTablename().equals(tableName) && info.getReplicaId() == 0) { + entries.add(new Entry(logKey, logEdit)); + } + } + } + + @Test + public void testReplayCallable() throws Exception { + // tests replaying the edits to a secondary region replica using the Callable directly + openRegion(HTU, rs0, hriSecondary); + ClusterConnection connection = + (ClusterConnection) HConnectionManager.createConnection(HTU.getConfiguration()); + + //load some data to primary + HTU.loadNumericRows(table, f, 0, 1000); + + Assert.assertEquals(1000, entries.size()); + // replay the edits to the secondary using replay callable + replicateUsingCallable(connection, entries); + + HRegion region = rs0.getFromOnlineRegions(hriSecondary.getEncodedName()); + HTU.verifyNumericRows(region, f, 0, 1000); + + HTU.deleteNumericRows(table, f, 0, 1000); + closeRegion(HTU, rs0, hriSecondary); + connection.close(); + } + + private void replicateUsingCallable(ClusterConnection connection, Queue entries) + throws IOException, RuntimeException { + Entry entry; + while ((entry = entries.poll()) != null) { + byte[] row = entry.getEdit().getCells().get(0).getRow(); + RegionLocations locations = connection.locateRegion(tableName, row, true, true); + RegionReplicaReplayCallable callable = new RegionReplicaReplayCallable(connection, + RpcControllerFactory.instantiate(connection.getConfiguration()), + table.getName(), locations.getRegionLocation(1), + locations.getRegionLocation(1).getRegionInfo(), row, Lists.newArrayList(entry), + new AtomicLong()); + + RpcRetryingCallerFactory factory = RpcRetryingCallerFactory.instantiate( + connection.getConfiguration()); + factory. newCaller().callWithRetries(callable, 10000); + } + } + + @Test + public void testReplayCallableWithRegionMove() throws Exception { + // tests replaying the edits to a secondary region replica using the Callable directly while + // the region is moved to another location.It tests handling of RME. + openRegion(HTU, rs0, hriSecondary); + ClusterConnection connection = + (ClusterConnection) HConnectionManager.createConnection(HTU.getConfiguration()); + //load some data to primary + HTU.loadNumericRows(table, f, 0, 1000); + + Assert.assertEquals(1000, entries.size()); + // replay the edits to the secondary using replay callable + replicateUsingCallable(connection, entries); + + HRegion region = rs0.getFromOnlineRegions(hriSecondary.getEncodedName()); + HTU.verifyNumericRows(region, f, 0, 1000); + + HTU.loadNumericRows(table, f, 1000, 2000); // load some more data to primary + + // move the secondary region from RS0 to RS1 + closeRegion(HTU, rs0, hriSecondary); + openRegion(HTU, rs1, hriSecondary); + + // replicate the new data + replicateUsingCallable(connection, entries); + + region = rs1.getFromOnlineRegions(hriSecondary.getEncodedName()); + // verify the new data. old data may or may not be there + HTU.verifyNumericRows(region, f, 1000, 2000); + + HTU.deleteNumericRows(table, f, 0, 2000); + closeRegion(HTU, rs1, hriSecondary); + connection.close(); + } + + @Test + public void testRegionReplicaReplicationEndpointReplicate() throws Exception { + // tests replaying the edits to a secondary region replica using the RRRE.replicate() + openRegion(HTU, rs0, hriSecondary); + ClusterConnection connection = + (ClusterConnection) HConnectionManager.createConnection(HTU.getConfiguration()); + RegionReplicaReplicationEndpoint replicator = new RegionReplicaReplicationEndpoint(); + + ReplicationEndpoint.Context context = mock(ReplicationEndpoint.Context.class); + when(context.getConfiguration()).thenReturn(HTU.getConfiguration()); + + replicator.init(context); + replicator.start(); + + //load some data to primary + HTU.loadNumericRows(table, f, 0, 1000); + + Assert.assertEquals(1000, entries.size()); + // replay the edits to the secondary using replay callable + replicator.replicate(new ReplicateContext().setEntries(Lists.newArrayList(entries))); + + HRegion region = rs0.getFromOnlineRegions(hriSecondary.getEncodedName()); + HTU.verifyNumericRows(region, f, 0, 1000); + + HTU.deleteNumericRows(table, f, 0, 1000); + closeRegion(HTU, rs0, hriSecondary); + connection.close(); + } + +} diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/wal/TestWALMethods.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/wal/TestWALMethods.java index f6bd2b1d742..1f07a4cbb62 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/wal/TestWALMethods.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/wal/TestWALMethods.java @@ -23,7 +23,6 @@ import static org.junit.Assert.assertNotNull; import static org.junit.Assert.assertNotSame; import static org.junit.Assert.assertNull; import static org.junit.Assert.assertTrue; -import static org.mockito.Mockito.mock; import java.io.IOException; import java.util.NavigableSet; @@ -39,6 +38,7 @@ import org.apache.hadoop.hbase.testclassification.SmallTests; import org.apache.hadoop.hbase.TableName; import org.apache.hadoop.hbase.protobuf.generated.ZooKeeperProtos.SplitLogTask.RecoveryMode; import org.apache.hadoop.hbase.wal.WALSplitter.EntryBuffers; +import org.apache.hadoop.hbase.wal.WALSplitter.PipelineController; import org.apache.hadoop.hbase.wal.WALSplitter.RegionEntryBuffer; import org.apache.hadoop.hbase.util.Bytes; import org.apache.hadoop.hbase.util.FSUtils; @@ -128,10 +128,8 @@ public class TestWALMethods { Configuration conf = new Configuration(); RecoveryMode mode = (conf.getBoolean(HConstants.DISTRIBUTED_LOG_REPLAY_KEY, false) ? RecoveryMode.LOG_REPLAY : RecoveryMode.LOG_SPLITTING); - WALSplitter splitter = new WALSplitter(WALFactory.getInstance(conf), - conf, mock(Path.class), mock(FileSystem.class), null, null, mode); - EntryBuffers sink = splitter.new EntryBuffers(1*1024*1024); + EntryBuffers sink = new EntryBuffers(new PipelineController(), 1*1024*1024); for (int i = 0; i < 1000; i++) { WAL.Entry entry = createTestLogEntry(i); sink.appendEntry(entry);