HBASE-25596: Fix NPE and avoid permanent unreplicated data due to EOF (#2990)
Signed-off-by: Xu Cang <xucang@apache.org> Signed-off-by: Andrew Purtell <apurtell@apache.org>
This commit is contained in:
parent
33c9f774d6
commit
d724d0576f
|
@ -30,12 +30,12 @@ import java.util.Set;
|
|||
import java.util.TreeMap;
|
||||
import java.util.UUID;
|
||||
import java.util.concurrent.ConcurrentHashMap;
|
||||
import java.util.concurrent.PriorityBlockingQueue;
|
||||
import java.util.concurrent.TimeUnit;
|
||||
import java.util.concurrent.TimeoutException;
|
||||
import java.util.concurrent.atomic.AtomicBoolean;
|
||||
import java.util.concurrent.atomic.AtomicLong;
|
||||
import java.util.function.Predicate;
|
||||
|
||||
import org.apache.commons.lang3.StringUtils;
|
||||
import org.apache.hadoop.conf.Configuration;
|
||||
import org.apache.hadoop.fs.FileSystem;
|
||||
|
@ -66,7 +66,6 @@ import org.apache.hadoop.hbase.wal.WAL.Entry;
|
|||
import org.apache.yetus.audience.InterfaceAudience;
|
||||
import org.slf4j.Logger;
|
||||
import org.slf4j.LoggerFactory;
|
||||
|
||||
import org.apache.hbase.thirdparty.com.google.common.collect.Lists;
|
||||
|
||||
/**
|
||||
|
@ -265,6 +264,11 @@ public class ReplicationSource implements ReplicationSourceInterface {
|
|||
}
|
||||
}
|
||||
|
||||
@InterfaceAudience.Private
|
||||
public Map<String, PriorityBlockingQueue<Path>> getQueues() {
|
||||
return logQueue.getQueues();
|
||||
}
|
||||
|
||||
@Override
|
||||
public void addHFileRefs(TableName tableName, byte[] family, List<Pair<Path, Path>> pairs)
|
||||
throws ReplicationException {
|
||||
|
|
|
@ -41,7 +41,6 @@ import org.apache.yetus.audience.InterfaceAudience;
|
|||
import org.apache.yetus.audience.InterfaceStability;
|
||||
import org.slf4j.Logger;
|
||||
import org.slf4j.LoggerFactory;
|
||||
|
||||
import org.apache.hadoop.hbase.shaded.protobuf.generated.WALProtos.BulkLoadDescriptor;
|
||||
import org.apache.hadoop.hbase.shaded.protobuf.generated.WALProtos.StoreDescriptor;
|
||||
|
||||
|
@ -123,44 +122,64 @@ class ReplicationSourceWALReader extends Thread {
|
|||
@Override
|
||||
public void run() {
|
||||
int sleepMultiplier = 1;
|
||||
while (isReaderRunning()) { // we only loop back here if something fatal happened to our stream
|
||||
try (WALEntryStream entryStream =
|
||||
new WALEntryStream(logQueue, conf, currentPosition,
|
||||
source.getWALFileLengthProvider(), source.getServerWALsBelongTo(),
|
||||
source.getSourceMetrics(), walGroupId)) {
|
||||
while (isReaderRunning()) { // loop here to keep reusing stream while we can
|
||||
if (!source.isPeerEnabled()) {
|
||||
Threads.sleep(sleepForRetries);
|
||||
continue;
|
||||
WALEntryBatch batch = null;
|
||||
WALEntryStream entryStream = null;
|
||||
try {
|
||||
// we only loop back here if something fatal happened to our stream
|
||||
while (isReaderRunning()) {
|
||||
try {
|
||||
entryStream =
|
||||
new WALEntryStream(logQueue, conf, currentPosition, source.getWALFileLengthProvider(),
|
||||
source.getServerWALsBelongTo(), source.getSourceMetrics(), walGroupId);
|
||||
while (isReaderRunning()) { // loop here to keep reusing stream while we can
|
||||
if (!source.isPeerEnabled()) {
|
||||
Threads.sleep(sleepForRetries);
|
||||
continue;
|
||||
}
|
||||
if (!checkQuota()) {
|
||||
continue;
|
||||
}
|
||||
|
||||
batch = createBatch(entryStream);
|
||||
batch = readWALEntries(entryStream, batch);
|
||||
currentPosition = entryStream.getPosition();
|
||||
if (batch == null) {
|
||||
// either the queue have no WAL to read
|
||||
// or got no new entries (didn't advance position in WAL)
|
||||
handleEmptyWALEntryBatch();
|
||||
entryStream.reset(); // reuse stream
|
||||
} else {
|
||||
addBatchToShippingQueue(batch);
|
||||
}
|
||||
}
|
||||
if (!checkQuota()) {
|
||||
continue;
|
||||
}
|
||||
WALEntryBatch batch = readWALEntries(entryStream);
|
||||
currentPosition = entryStream.getPosition();
|
||||
if (batch != null) {
|
||||
// need to propagate the batch even it has no entries since it may carry the last
|
||||
// sequence id information for serial replication.
|
||||
LOG.debug("Read {} WAL entries eligible for replication", batch.getNbEntries());
|
||||
entryBatchQueue.put(batch);
|
||||
} catch (IOException e) { // stream related
|
||||
if (handleEofException(e, batch)) {
|
||||
sleepMultiplier = 1;
|
||||
} else { // got no entries and didn't advance position in WAL
|
||||
handleEmptyWALEntryBatch(entryStream.getCurrentPath());
|
||||
entryStream.reset(); // reuse stream
|
||||
} else {
|
||||
LOG.warn("Failed to read stream of replication entries", e);
|
||||
if (sleepMultiplier < maxRetriesMultiplier) {
|
||||
sleepMultiplier++;
|
||||
}
|
||||
Threads.sleep(sleepForRetries * sleepMultiplier);
|
||||
}
|
||||
} catch (InterruptedException e) {
|
||||
LOG.trace("Interrupted while sleeping between WAL reads");
|
||||
Thread.currentThread().interrupt();
|
||||
} finally {
|
||||
entryStream.close();
|
||||
}
|
||||
} catch (IOException e) { // stream related
|
||||
if (!handleEofException(e)) {
|
||||
LOG.warn("Failed to read stream of replication entries", e);
|
||||
if (sleepMultiplier < maxRetriesMultiplier) {
|
||||
sleepMultiplier ++;
|
||||
}
|
||||
Threads.sleep(sleepForRetries * sleepMultiplier);
|
||||
}
|
||||
} catch (InterruptedException e) {
|
||||
LOG.trace("Interrupted while sleeping between WAL reads");
|
||||
Thread.currentThread().interrupt();
|
||||
}
|
||||
} catch (IOException e) {
|
||||
if (sleepMultiplier < maxRetriesMultiplier) {
|
||||
LOG.debug("Failed to read stream of replication entries: " + e);
|
||||
sleepMultiplier++;
|
||||
} else {
|
||||
LOG.error("Failed to read stream of replication entries", e);
|
||||
}
|
||||
Threads.sleep(sleepForRetries * sleepMultiplier);
|
||||
} catch (InterruptedException e) {
|
||||
LOG.trace("Interrupted while sleeping between WAL reads");
|
||||
Thread.currentThread().interrupt();
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -189,14 +208,19 @@ class ReplicationSourceWALReader extends Thread {
|
|||
return newPath == null || !path.getName().equals(newPath.getName());
|
||||
}
|
||||
|
||||
protected WALEntryBatch readWALEntries(WALEntryStream entryStream)
|
||||
throws IOException, InterruptedException {
|
||||
// We need to get the WALEntryBatch from the caller so we can add entries in there
|
||||
// This is required in case there is any exception in while reading entries
|
||||
// we do want to loss the existing entries in the batch
|
||||
protected WALEntryBatch readWALEntries(WALEntryStream entryStream,
|
||||
WALEntryBatch batch) throws IOException, InterruptedException {
|
||||
Path currentPath = entryStream.getCurrentPath();
|
||||
if (!entryStream.hasNext()) {
|
||||
// check whether we have switched a file
|
||||
if (currentPath != null && switched(entryStream, currentPath)) {
|
||||
return WALEntryBatch.endOfFile(currentPath);
|
||||
} else {
|
||||
// This would mean either no more files in the queue
|
||||
// or there is no new data yet on the current wal
|
||||
return null;
|
||||
}
|
||||
}
|
||||
|
@ -208,7 +232,7 @@ class ReplicationSourceWALReader extends Thread {
|
|||
// when reading from the entry stream first time we will enter here
|
||||
currentPath = entryStream.getCurrentPath();
|
||||
}
|
||||
WALEntryBatch batch = createBatch(entryStream);
|
||||
batch.setLastWalPath(currentPath);
|
||||
for (;;) {
|
||||
Entry entry = entryStream.next();
|
||||
batch.setLastWalPosition(entryStream.getPosition());
|
||||
|
@ -231,10 +255,12 @@ class ReplicationSourceWALReader extends Thread {
|
|||
return batch;
|
||||
}
|
||||
|
||||
private void handleEmptyWALEntryBatch(Path currentPath) throws InterruptedException {
|
||||
private void handleEmptyWALEntryBatch() throws InterruptedException {
|
||||
LOG.trace("Didn't read any new entries from WAL");
|
||||
if (source.isRecovered()) {
|
||||
// we're done with queue recovery, shut ourself down
|
||||
if (logQueue.getQueue(walGroupId).isEmpty()) {
|
||||
// we're done with current queue, either this is a recovered queue, or it is the special group
|
||||
// for a sync replication peer and the peer has been transited to DA or S state.
|
||||
LOG.debug("Stopping the replication source wal reader");
|
||||
setReaderRunning(false);
|
||||
// shuts down shipper thread immediately
|
||||
entryBatchQueue.put(WALEntryBatch.NO_MORE_DATA);
|
||||
|
@ -244,22 +270,38 @@ class ReplicationSourceWALReader extends Thread {
|
|||
}
|
||||
|
||||
/**
|
||||
* if we get an EOF due to a zero-length log, and there are other logs in queue
|
||||
* (highly likely we've closed the current log), and autorecovery is
|
||||
* enabled, then dump the log
|
||||
* This is to handle the EOFException from the WAL entry stream. EOFException should
|
||||
* be handled carefully because there are chances of data loss because of never replicating
|
||||
* the data. Thus we should always try to ship existing batch of entries here.
|
||||
* If there was only one log in the queue before EOF, we ship the empty batch here
|
||||
* and since reader is still active, in the next iteration of reader we will
|
||||
* stop the reader.
|
||||
* If there was more than one log in the queue before EOF, we ship the existing batch
|
||||
* and reset the wal patch and position to the log with EOF, so shipper can remove
|
||||
* logs from replication queue
|
||||
* @return true only the IOE can be handled
|
||||
*/
|
||||
private boolean handleEofException(IOException e) {
|
||||
private boolean handleEofException(IOException e, WALEntryBatch batch)
|
||||
throws InterruptedException {
|
||||
PriorityBlockingQueue<Path> queue = logQueue.getQueue(walGroupId);
|
||||
// Dump the log even if logQueue size is 1 if the source is from recovered Source
|
||||
// since we don't add current log to recovered source queue so it is safe to remove.
|
||||
if ((e instanceof EOFException || e.getCause() instanceof EOFException) &&
|
||||
(source.isRecovered() || queue.size() > 1) && this.eofAutoRecovery) {
|
||||
if ((e instanceof EOFException || e.getCause() instanceof EOFException)
|
||||
&& (source.isRecovered() || queue.size() > 1)
|
||||
&& this.eofAutoRecovery) {
|
||||
Path head = queue.peek();
|
||||
try {
|
||||
if (fs.getFileStatus(queue.peek()).getLen() == 0) {
|
||||
LOG.warn("Forcing removal of 0 length log in queue: {}", queue.peek());
|
||||
if (fs.getFileStatus(head).getLen() == 0) {
|
||||
// head of the queue is an empty log file
|
||||
LOG.warn("Forcing removal of 0 length log in queue: {}", head);
|
||||
logQueue.remove(walGroupId);
|
||||
currentPosition = 0;
|
||||
// After we removed the WAL from the queue, we should
|
||||
// try shipping the existing batch of entries and set the wal position
|
||||
// and path to the wal just dequeued to correctly remove logs from the zk
|
||||
batch.setLastWalPath(head);
|
||||
batch.setLastWalPosition(currentPosition);
|
||||
addBatchToShippingQueue(batch);
|
||||
return true;
|
||||
}
|
||||
} catch (IOException ioe) {
|
||||
|
@ -269,6 +311,20 @@ class ReplicationSourceWALReader extends Thread {
|
|||
return false;
|
||||
}
|
||||
|
||||
/**
|
||||
* Update the batch try to ship and return true if shipped
|
||||
* @param batch Batch of entries to ship
|
||||
* @throws InterruptedException throws interrupted exception
|
||||
* @throws IOException throws io exception from stream
|
||||
*/
|
||||
private void addBatchToShippingQueue(WALEntryBatch batch)
|
||||
throws InterruptedException, IOException {
|
||||
// need to propagate the batch even it has no entries since it may carry the last
|
||||
// sequence id information for serial replication.
|
||||
LOG.debug("Read {} WAL entries eligible for replication", batch.getNbEntries());
|
||||
entryBatchQueue.put(batch);
|
||||
}
|
||||
|
||||
public Path getCurrentPath() {
|
||||
// if we've read some WAL entries, get the Path we read from
|
||||
WALEntryBatch batchQueueHead = entryBatchQueue.peek();
|
||||
|
|
|
@ -50,7 +50,7 @@ public class SerialReplicationSourceWALReader extends ReplicationSourceWALReader
|
|||
}
|
||||
|
||||
@Override
|
||||
protected WALEntryBatch readWALEntries(WALEntryStream entryStream)
|
||||
protected WALEntryBatch readWALEntries(WALEntryStream entryStream, WALEntryBatch batch)
|
||||
throws IOException, InterruptedException {
|
||||
Path currentPath = entryStream.getCurrentPath();
|
||||
if (!entryStream.hasNext()) {
|
||||
|
@ -70,7 +70,7 @@ public class SerialReplicationSourceWALReader extends ReplicationSourceWALReader
|
|||
currentPath = entryStream.getCurrentPath();
|
||||
}
|
||||
long positionBefore = entryStream.getPosition();
|
||||
WALEntryBatch batch = createBatch(entryStream);
|
||||
batch = createBatch(entryStream);
|
||||
for (;;) {
|
||||
Entry entry = entryStream.peek();
|
||||
boolean doFiltering = true;
|
||||
|
|
|
@ -94,6 +94,10 @@ class WALEntryBatch {
|
|||
return lastWalPath;
|
||||
}
|
||||
|
||||
public void setLastWalPath(Path lastWalPath) {
|
||||
this.lastWalPath = lastWalPath;
|
||||
}
|
||||
|
||||
/**
|
||||
* @return the position in the last WAL that was read.
|
||||
*/
|
||||
|
|
|
@ -80,7 +80,7 @@ class WALEntryStream implements Closeable {
|
|||
* @param walFileLengthProvider provides the length of the WAL file
|
||||
* @param serverName the server name which all WALs belong to
|
||||
* @param metrics the replication metrics
|
||||
* @throws IOException
|
||||
* @throws IOException throw IO exception from stream
|
||||
*/
|
||||
public WALEntryStream(ReplicationSourceLogQueue logQueue, Configuration conf,
|
||||
long startPosition, WALFileLengthProvider walFileLengthProvider, ServerName serverName,
|
||||
|
@ -368,7 +368,9 @@ class WALEntryStream implements Closeable {
|
|||
handleFileNotFound(path, fnfe);
|
||||
} catch (RemoteException re) {
|
||||
IOException ioe = re.unwrapRemoteException(FileNotFoundException.class);
|
||||
if (!(ioe instanceof FileNotFoundException)) throw ioe;
|
||||
if (!(ioe instanceof FileNotFoundException)) {
|
||||
throw ioe;
|
||||
}
|
||||
handleFileNotFound(path, (FileNotFoundException)ioe);
|
||||
} catch (LeaseNotRecoveredException lnre) {
|
||||
// HBASE-15019 the WAL was not closed due to some hiccup.
|
||||
|
|
|
@ -20,10 +20,10 @@ package org.apache.hadoop.hbase.replication;
|
|||
import static org.junit.Assert.assertArrayEquals;
|
||||
import static org.junit.Assert.assertEquals;
|
||||
import static org.junit.Assert.fail;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.util.ArrayList;
|
||||
import java.util.List;
|
||||
import java.util.concurrent.atomic.AtomicInteger;
|
||||
import org.apache.hadoop.conf.Configuration;
|
||||
import org.apache.hadoop.hbase.HBaseConfiguration;
|
||||
import org.apache.hadoop.hbase.HBaseTestingUtility;
|
||||
|
@ -43,8 +43,10 @@ import org.apache.hadoop.hbase.client.Table;
|
|||
import org.apache.hadoop.hbase.client.TableDescriptor;
|
||||
import org.apache.hadoop.hbase.client.TableDescriptorBuilder;
|
||||
import org.apache.hadoop.hbase.client.replication.ReplicationAdmin;
|
||||
import org.apache.hadoop.hbase.replication.regionserver.HBaseInterClusterReplicationEndpoint;
|
||||
import org.apache.hadoop.hbase.util.Bytes;
|
||||
import org.apache.hadoop.hbase.util.JVMClusterUtil;
|
||||
import org.apache.hadoop.hbase.wal.WAL;
|
||||
import org.apache.hadoop.hbase.zookeeper.MiniZooKeeperCluster;
|
||||
import org.junit.After;
|
||||
import org.junit.AfterClass;
|
||||
|
@ -52,7 +54,7 @@ import org.junit.Before;
|
|||
import org.junit.BeforeClass;
|
||||
import org.slf4j.Logger;
|
||||
import org.slf4j.LoggerFactory;
|
||||
|
||||
import org.apache.hbase.thirdparty.com.google.common.collect.Lists;
|
||||
import org.apache.hbase.thirdparty.com.google.common.io.Closeables;
|
||||
|
||||
/**
|
||||
|
@ -63,7 +65,8 @@ import org.apache.hbase.thirdparty.com.google.common.io.Closeables;
|
|||
*/
|
||||
public class TestReplicationBase {
|
||||
private static final Logger LOG = LoggerFactory.getLogger(TestReplicationBase.class);
|
||||
|
||||
private static Connection connection1;
|
||||
private static Connection connection2;
|
||||
protected static Configuration CONF_WITH_LOCALFS;
|
||||
|
||||
protected static ReplicationAdmin admin;
|
||||
|
@ -84,6 +87,8 @@ public class TestReplicationBase {
|
|||
NB_ROWS_IN_BATCH * 10;
|
||||
protected static final long SLEEP_TIME = 500;
|
||||
protected static final int NB_RETRIES = 50;
|
||||
protected static AtomicInteger replicateCount = new AtomicInteger();
|
||||
protected static volatile List<WAL.Entry> replicatedEntries = Lists.newArrayList();
|
||||
|
||||
protected static final TableName tableName = TableName.valueOf("test");
|
||||
protected static final byte[] famName = Bytes.toBytes("f");
|
||||
|
@ -238,26 +243,26 @@ public class TestReplicationBase {
|
|||
// as a component in deciding maximum number of parallel batches to send to the peer cluster.
|
||||
UTIL2.startMiniCluster(NUM_SLAVES2);
|
||||
|
||||
admin = new ReplicationAdmin(CONF1);
|
||||
hbaseAdmin = ConnectionFactory.createConnection(CONF1).getAdmin();
|
||||
connection1 = ConnectionFactory.createConnection(CONF1);
|
||||
connection2 = ConnectionFactory.createConnection(CONF2);
|
||||
hbaseAdmin = connection1.getAdmin();
|
||||
|
||||
TableDescriptor table = TableDescriptorBuilder.newBuilder(tableName)
|
||||
.setColumnFamily(ColumnFamilyDescriptorBuilder.newBuilder(famName).setMaxVersions(100)
|
||||
.setScope(HConstants.REPLICATION_SCOPE_GLOBAL).build())
|
||||
.setColumnFamily(ColumnFamilyDescriptorBuilder.of(noRepfamName)).build();
|
||||
|
||||
Connection connection1 = ConnectionFactory.createConnection(CONF1);
|
||||
Connection connection2 = ConnectionFactory.createConnection(CONF2);
|
||||
try (Admin admin1 = connection1.getAdmin()) {
|
||||
try (
|
||||
Admin admin1 = connection1.getAdmin();
|
||||
Admin admin2 = connection2.getAdmin()) {
|
||||
admin1.createTable(table, HBaseTestingUtility.KEYS_FOR_HBA_CREATE_TABLE);
|
||||
}
|
||||
try (Admin admin2 = connection2.getAdmin()) {
|
||||
admin2.createTable(table, HBaseTestingUtility.KEYS_FOR_HBA_CREATE_TABLE);
|
||||
UTIL1.waitUntilAllRegionsAssigned(tableName);
|
||||
htable1 = connection1.getTable(tableName);
|
||||
UTIL2.waitUntilAllRegionsAssigned(tableName);
|
||||
htable2 = connection2.getTable(tableName);
|
||||
}
|
||||
UTIL1.waitUntilAllRegionsAssigned(tableName);
|
||||
UTIL2.waitUntilAllRegionsAssigned(tableName);
|
||||
htable1 = connection1.getTable(tableName);
|
||||
htable2 = connection2.getTable(tableName);
|
||||
|
||||
}
|
||||
|
||||
@BeforeClass
|
||||
|
@ -273,9 +278,10 @@ public class TestReplicationBase {
|
|||
@Before
|
||||
public void setUpBase() throws Exception {
|
||||
if (!peerExist(PEER_ID2)) {
|
||||
ReplicationPeerConfig rpc = ReplicationPeerConfig.newBuilder()
|
||||
.setClusterKey(UTIL2.getClusterKey()).setSerial(isSerialPeer()).build();
|
||||
hbaseAdmin.addReplicationPeer(PEER_ID2, rpc);
|
||||
ReplicationPeerConfigBuilder builder = ReplicationPeerConfig.newBuilder()
|
||||
.setClusterKey(UTIL2.getClusterKey()).setSerial(isSerialPeer()).setReplicationEndpointImpl(
|
||||
ReplicationEndpointTest.class.getName());
|
||||
hbaseAdmin.addReplicationPeer(PEER_ID2, builder.build());
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -351,7 +357,33 @@ public class TestReplicationBase {
|
|||
if (admin != null) {
|
||||
admin.close();
|
||||
}
|
||||
if (hbaseAdmin != null) {
|
||||
hbaseAdmin.close();
|
||||
}
|
||||
|
||||
if (connection2 != null) {
|
||||
connection2.close();
|
||||
}
|
||||
if (connection1 != null) {
|
||||
connection1.close();
|
||||
}
|
||||
UTIL2.shutdownMiniCluster();
|
||||
UTIL1.shutdownMiniCluster();
|
||||
}
|
||||
|
||||
/**
|
||||
* Custom replication endpoint to keep track of replication status for tests.
|
||||
*/
|
||||
public static class ReplicationEndpointTest extends HBaseInterClusterReplicationEndpoint {
|
||||
public ReplicationEndpointTest() {
|
||||
replicateCount.set(0);
|
||||
}
|
||||
|
||||
@Override public boolean replicate(ReplicateContext replicateContext) {
|
||||
replicateCount.incrementAndGet();
|
||||
replicatedEntries.addAll(replicateContext.getEntries());
|
||||
|
||||
return super.replicate(replicateContext);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
|
@ -6,9 +6,7 @@
|
|||
* to you under the Apache License, Version 2.0 (the
|
||||
* "License"); you may not use this file except in compliance
|
||||
* with the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
|
@ -20,56 +18,99 @@ package org.apache.hadoop.hbase.replication;
|
|||
import java.io.IOException;
|
||||
import java.util.ArrayList;
|
||||
import java.util.List;
|
||||
import java.util.NavigableMap;
|
||||
import java.util.TreeMap;
|
||||
import org.apache.hadoop.fs.Path;
|
||||
import org.apache.hadoop.hbase.HBaseClassTestRule;
|
||||
import org.apache.hadoop.hbase.HConstants;
|
||||
import org.apache.hadoop.hbase.KeyValue;
|
||||
import org.apache.hadoop.hbase.Waiter;
|
||||
import org.apache.hadoop.hbase.client.RegionInfo;
|
||||
import org.apache.hadoop.hbase.client.RegionInfoBuilder;
|
||||
import org.apache.hadoop.hbase.regionserver.HRegionServer;
|
||||
import org.apache.hadoop.hbase.regionserver.MultiVersionConcurrencyControl;
|
||||
import org.apache.hadoop.hbase.regionserver.wal.AbstractFSWAL;
|
||||
import org.apache.hadoop.hbase.replication.regionserver.Replication;
|
||||
import org.apache.hadoop.hbase.replication.regionserver.ReplicationSource;
|
||||
import org.apache.hadoop.hbase.replication.regionserver.ReplicationSourceInterface;
|
||||
import org.apache.hadoop.hbase.testclassification.LargeTests;
|
||||
import org.apache.hadoop.hbase.testclassification.ReplicationTests;
|
||||
import org.apache.hadoop.hbase.util.Bytes;
|
||||
import org.apache.hadoop.hbase.wal.AbstractFSWALProvider;
|
||||
import org.apache.hadoop.hbase.wal.WAL;
|
||||
import org.apache.hadoop.hbase.wal.WALEdit;
|
||||
import org.apache.hadoop.hbase.wal.WALKeyImpl;
|
||||
import org.junit.Assert;
|
||||
import org.junit.Before;
|
||||
import org.junit.ClassRule;
|
||||
import org.junit.Test;
|
||||
import org.junit.experimental.categories.Category;
|
||||
|
||||
@Category({ ReplicationTests.class, LargeTests.class })
|
||||
public class TestReplicationEmptyWALRecovery extends TestReplicationBase {
|
||||
@Category
|
||||
({ ReplicationTests.class, LargeTests.class }) public class TestReplicationEmptyWALRecovery
|
||||
extends TestReplicationBase {
|
||||
MultiVersionConcurrencyControl mvcc = new MultiVersionConcurrencyControl();
|
||||
static final RegionInfo info = RegionInfoBuilder.newBuilder(tableName).build();
|
||||
NavigableMap<byte[], Integer> scopes = new TreeMap<>(Bytes.BYTES_COMPARATOR);
|
||||
|
||||
@ClassRule
|
||||
public static final HBaseClassTestRule CLASS_RULE =
|
||||
HBaseClassTestRule.forClass(TestReplicationEmptyWALRecovery.class);
|
||||
@ClassRule public static final HBaseClassTestRule CLASS_RULE =
|
||||
HBaseClassTestRule.forClass(TestReplicationEmptyWALRecovery.class);
|
||||
|
||||
@Before
|
||||
public void setUp() throws IOException, InterruptedException {
|
||||
cleanUp();
|
||||
scopes.put(famName, HConstants.REPLICATION_SCOPE_GLOBAL);
|
||||
replicateCount.set(0);
|
||||
replicatedEntries.clear();
|
||||
}
|
||||
|
||||
/**
|
||||
* Waits until there is only one log(the current writing one) in the replication queue
|
||||
* @param numRs number of regionservers
|
||||
*
|
||||
* @param numRs number of region servers
|
||||
*/
|
||||
private void waitForLogAdvance(int numRs) throws Exception {
|
||||
Waiter.waitFor(CONF1, 10000, new Waiter.Predicate<Exception>() {
|
||||
private void waitForLogAdvance(int numRs) {
|
||||
Waiter.waitFor(CONF1, 100000, new Waiter.Predicate<Exception>() {
|
||||
@Override
|
||||
public boolean evaluate() throws Exception {
|
||||
for (int i = 0; i < numRs; i++) {
|
||||
HRegionServer hrs = UTIL1.getHBaseCluster().getRegionServer(i);
|
||||
RegionInfo regionInfo =
|
||||
UTIL1.getHBaseCluster().getRegions(htable1.getName()).get(0).getRegionInfo();
|
||||
UTIL1.getHBaseCluster().getRegions(htable1.getName()).get(0).getRegionInfo();
|
||||
WAL wal = hrs.getWAL(regionInfo);
|
||||
Path currentFile = ((AbstractFSWAL<?>) wal).getCurrentFileName();
|
||||
Replication replicationService = (Replication) UTIL1.getHBaseCluster()
|
||||
.getRegionServer(i).getReplicationSourceService();
|
||||
Replication replicationService =
|
||||
(Replication) UTIL1.getHBaseCluster().getRegionServer(i).getReplicationSourceService();
|
||||
for (ReplicationSourceInterface rsi : replicationService.getReplicationManager()
|
||||
.getSources()) {
|
||||
.getSources()) {
|
||||
ReplicationSource source = (ReplicationSource) rsi;
|
||||
if (!currentFile.equals(source.getCurrentPath())) {
|
||||
// We are making sure that there is only one log queue and that is for the
|
||||
// current WAL of region server
|
||||
String logPrefix = source.getQueues().keySet().stream().findFirst().get();
|
||||
if (!currentFile.equals(source.getCurrentPath())
|
||||
|| source.getQueues().keySet().size() != 1
|
||||
|| source.getQueues().get(logPrefix).size() != 1) {
|
||||
return false;
|
||||
}
|
||||
}
|
||||
}
|
||||
return true;
|
||||
}
|
||||
});
|
||||
}
|
||||
|
||||
private void verifyNumberOfLogsInQueue(int numQueues, int numRs) {
|
||||
Waiter.waitFor(CONF1, 10000, new Waiter.Predicate<Exception>() {
|
||||
@Override
|
||||
public boolean evaluate() {
|
||||
for (int i = 0; i < numRs; i++) {
|
||||
Replication replicationService =
|
||||
(Replication) UTIL1.getHBaseCluster().getRegionServer(i).getReplicationSourceService();
|
||||
for (ReplicationSourceInterface rsi : replicationService.getReplicationManager()
|
||||
.getSources()) {
|
||||
ReplicationSource source = (ReplicationSource) rsi;
|
||||
String logPrefix = source.getQueues().keySet().stream().findFirst().get();
|
||||
if (source.getQueues().get(logPrefix).size() != numQueues) {
|
||||
return false;
|
||||
}
|
||||
}
|
||||
|
@ -82,13 +123,12 @@ public class TestReplicationEmptyWALRecovery extends TestReplicationBase {
|
|||
@Test
|
||||
public void testEmptyWALRecovery() throws Exception {
|
||||
final int numRs = UTIL1.getHBaseCluster().getRegionServerThreads().size();
|
||||
|
||||
// for each RS, create an empty wal with same walGroupId
|
||||
final List<Path> emptyWalPaths = new ArrayList<>();
|
||||
long ts = System.currentTimeMillis();
|
||||
for (int i = 0; i < numRs; i++) {
|
||||
RegionInfo regionInfo =
|
||||
UTIL1.getHBaseCluster().getRegions(htable1.getName()).get(0).getRegionInfo();
|
||||
UTIL1.getHBaseCluster().getRegions(htable1.getName()).get(0).getRegionInfo();
|
||||
WAL wal = UTIL1.getHBaseCluster().getRegionServer(i).getWAL(regionInfo);
|
||||
Path currentWalPath = AbstractFSWALProvider.getCurrentFileName(wal);
|
||||
String walGroupId = AbstractFSWALProvider.getWALPrefixFromWALName(currentWalPath.getName());
|
||||
|
@ -97,10 +137,197 @@ public class TestReplicationEmptyWALRecovery extends TestReplicationBase {
|
|||
emptyWalPaths.add(emptyWalPath);
|
||||
}
|
||||
|
||||
// inject our empty wal into the replication queue, and then roll the original wal, which
|
||||
// enqueues a new wal behind our empty wal. We must roll the wal here as now we use the WAL to
|
||||
// determine if the file being replicated currently is still opened for write, so just inject a
|
||||
// new wal to the replication queue does not mean the previous file is closed.
|
||||
injectEmptyWAL(numRs, emptyWalPaths);
|
||||
|
||||
// ReplicationSource should advance past the empty wal, or else the test will fail
|
||||
waitForLogAdvance(numRs);
|
||||
verifyNumberOfLogsInQueue(1, numRs);
|
||||
// we're now writing to the new wal
|
||||
// if everything works, the source should've stopped reading from the empty wal, and start
|
||||
// replicating from the new wal
|
||||
runSimplePutDeleteTest();
|
||||
rollWalsAndWaitForDeque(numRs);
|
||||
}
|
||||
|
||||
/**
|
||||
* Test empty WAL along with non empty WALs in the same batch. This test is to make sure
|
||||
* when we see the empty and handle the EOF exception, we are able to existing the previous
|
||||
* batch of entries without loosing it. This test also tests the number of batches shipped
|
||||
*
|
||||
* @throws Exception throws any exception
|
||||
*/
|
||||
@Test
|
||||
public void testReplicationOfEmptyWALFollowingNonEmptyWAL() throws Exception {
|
||||
// Disable the replication peer to accumulate the non empty WAL followed by empty WAL
|
||||
hbaseAdmin.disableReplicationPeer(PEER_ID2);
|
||||
int numOfEntriesToReplicate = 20;
|
||||
|
||||
final int numRs = UTIL1.getHBaseCluster().getRegionServerThreads().size();
|
||||
// for each RS, create an empty wal with same walGroupId
|
||||
final List<Path> emptyWalPaths = new ArrayList<>();
|
||||
long ts = System.currentTimeMillis();
|
||||
for (int i = 0; i < numRs; i++) {
|
||||
RegionInfo regionInfo =
|
||||
UTIL1.getHBaseCluster().getRegions(tableName.getName()).get(0).getRegionInfo();
|
||||
WAL wal = UTIL1.getHBaseCluster().getRegionServer(i).getWAL(regionInfo);
|
||||
Path currentWalPath = AbstractFSWALProvider.getCurrentFileName(wal);
|
||||
|
||||
appendEntriesToWal(numOfEntriesToReplicate, wal);
|
||||
wal.rollWriter();
|
||||
String walGroupId = AbstractFSWALProvider.getWALPrefixFromWALName(currentWalPath.getName());
|
||||
Path emptyWalPath = new Path(UTIL1.getDefaultRootDirPath(), walGroupId + "." + ts);
|
||||
UTIL1.getTestFileSystem().create(emptyWalPath).close();
|
||||
emptyWalPaths.add(emptyWalPath);
|
||||
}
|
||||
|
||||
injectEmptyWAL(numRs, emptyWalPaths);
|
||||
// There should be three WALs in queue
|
||||
// 1. empty WAL
|
||||
// 2. non empty WAL
|
||||
// 3. live WAL
|
||||
//verifyNumberOfLogsInQueue(3, numRs);
|
||||
hbaseAdmin.enableReplicationPeer(PEER_ID2);
|
||||
// ReplicationSource should advance past the empty wal, or else the test will fail
|
||||
waitForLogAdvance(numRs);
|
||||
|
||||
// Now we should expect numOfEntriesToReplicate entries
|
||||
// replicated from each region server. This makes sure we didn't loose data
|
||||
// from any previous batch when we encounter EOF exception for empty file.
|
||||
Assert.assertEquals("Replicated entries are not correct", numOfEntriesToReplicate * numRs,
|
||||
replicatedEntries.size());
|
||||
|
||||
// We expect just one batch of replication which will
|
||||
// be from when we handle the EOF exception.
|
||||
Assert.assertEquals("Replicated batches are not correct", 1, replicateCount.intValue());
|
||||
verifyNumberOfLogsInQueue(1, numRs);
|
||||
// we're now writing to the new wal
|
||||
// if everything works, the source should've stopped reading from the empty wal, and start
|
||||
// replicating from the new wal
|
||||
runSimplePutDeleteTest();
|
||||
rollWalsAndWaitForDeque(numRs);
|
||||
}
|
||||
|
||||
/**
|
||||
* Test empty WAL along with non empty WALs in the same batch. This test is to make sure
|
||||
* when we see the empty WAL and handle the EOF exception, we are able to proceed
|
||||
* with next batch and replicate it properly without missing data.
|
||||
*
|
||||
* @throws Exception throws any exception
|
||||
*/
|
||||
@Test
|
||||
public void testReplicationOfEmptyWALFollowedByNonEmptyWAL() throws Exception {
|
||||
// Disable the replication peer to accumulate the non empty WAL followed by empty WAL
|
||||
hbaseAdmin.disableReplicationPeer(PEER_ID2);
|
||||
int numOfEntriesToReplicate = 20;
|
||||
|
||||
final int numRs = UTIL1.getHBaseCluster().getRegionServerThreads().size();
|
||||
// for each RS, create an empty wal with same walGroupId
|
||||
final List<Path> emptyWalPaths = new ArrayList<>();
|
||||
|
||||
long ts = System.currentTimeMillis();
|
||||
WAL wal = null;
|
||||
for (int i = 0; i < numRs; i++) {
|
||||
RegionInfo regionInfo =
|
||||
UTIL1.getHBaseCluster().getRegions(tableName.getName()).get(0).getRegionInfo();
|
||||
wal = UTIL1.getHBaseCluster().getRegionServer(i).getWAL(regionInfo);
|
||||
Path currentWalPath = AbstractFSWALProvider.getCurrentFileName(wal);
|
||||
appendEntriesToWal(numOfEntriesToReplicate, wal);
|
||||
String walGroupId = AbstractFSWALProvider.getWALPrefixFromWALName(currentWalPath.getName());
|
||||
Path emptyWalPath = new Path(UTIL1.getDataTestDir(), walGroupId + "." + ts);
|
||||
UTIL1.getTestFileSystem().create(emptyWalPath).close();
|
||||
emptyWalPaths.add(emptyWalPath);
|
||||
|
||||
}
|
||||
injectEmptyWAL(numRs, emptyWalPaths);
|
||||
// roll the WAL now
|
||||
for (int i = 0; i < numRs; i++) {
|
||||
wal.rollWriter();
|
||||
}
|
||||
hbaseAdmin.enableReplicationPeer(PEER_ID2);
|
||||
// ReplicationSource should advance past the empty wal, or else the test will fail
|
||||
waitForLogAdvance(numRs);
|
||||
|
||||
// Now we should expect numOfEntriesToReplicate entries
|
||||
// replicated from each region server. This makes sure we didn't loose data
|
||||
// from any previous batch when we encounter EOF exception for empty file.
|
||||
Assert.assertEquals("Replicated entries are not correct", numOfEntriesToReplicate * numRs,
|
||||
replicatedEntries.size());
|
||||
|
||||
// We expect just one batch of replication to be shipped which will
|
||||
// for non empty WAL
|
||||
Assert.assertEquals("Replicated batches are not correct", 1, replicateCount.get());
|
||||
verifyNumberOfLogsInQueue(1, numRs);
|
||||
// we're now writing to the new wal
|
||||
// if everything works, the source should've stopped reading from the empty wal, and start
|
||||
// replicating from the new wal
|
||||
runSimplePutDeleteTest();
|
||||
rollWalsAndWaitForDeque(numRs);
|
||||
}
|
||||
|
||||
/**
|
||||
* This test make sure we replicate all the enties from the non empty WALs which
|
||||
* are surrounding the empty WALs
|
||||
*
|
||||
* @throws Exception throws exception
|
||||
*/
|
||||
@Test
|
||||
public void testReplicationOfEmptyWALSurroundedNonEmptyWAL() throws Exception {
|
||||
// Disable the replication peer to accumulate the non empty WAL followed by empty WAL
|
||||
hbaseAdmin.disableReplicationPeer(PEER_ID2);
|
||||
int numOfEntriesToReplicate = 20;
|
||||
|
||||
final int numRs = UTIL1.getHBaseCluster().getRegionServerThreads().size();
|
||||
// for each RS, create an empty wal with same walGroupId
|
||||
final List<Path> emptyWalPaths = new ArrayList<>();
|
||||
|
||||
long ts = System.currentTimeMillis();
|
||||
WAL wal = null;
|
||||
for (int i = 0; i < numRs; i++) {
|
||||
RegionInfo regionInfo =
|
||||
UTIL1.getHBaseCluster().getRegions(tableName.getName()).get(0).getRegionInfo();
|
||||
wal = UTIL1.getHBaseCluster().getRegionServer(i).getWAL(regionInfo);
|
||||
Path currentWalPath = AbstractFSWALProvider.getCurrentFileName(wal);
|
||||
appendEntriesToWal(numOfEntriesToReplicate, wal);
|
||||
wal.rollWriter();
|
||||
String walGroupId = AbstractFSWALProvider.getWALPrefixFromWALName(currentWalPath.getName());
|
||||
Path emptyWalPath = new Path(UTIL1.getDataTestDir(), walGroupId + "." + ts);
|
||||
UTIL1.getTestFileSystem().create(emptyWalPath).close();
|
||||
emptyWalPaths.add(emptyWalPath);
|
||||
}
|
||||
injectEmptyWAL(numRs, emptyWalPaths);
|
||||
|
||||
// roll the WAL again with some entries
|
||||
for (int i = 0; i < numRs; i++) {
|
||||
appendEntriesToWal(numOfEntriesToReplicate, wal);
|
||||
wal.rollWriter();
|
||||
}
|
||||
|
||||
hbaseAdmin.enableReplicationPeer(PEER_ID2);
|
||||
// ReplicationSource should advance past the empty wal, or else the test will fail
|
||||
waitForLogAdvance(numRs);
|
||||
|
||||
// Now we should expect numOfEntriesToReplicate entries
|
||||
// replicated from each region server. This makes sure we didn't loose data
|
||||
// from any previous batch when we encounter EOF exception for empty file.
|
||||
Assert.assertEquals("Replicated entries are not correct", numOfEntriesToReplicate * numRs * 2,
|
||||
replicatedEntries.size());
|
||||
|
||||
// We expect two batch of replication to be shipped which will
|
||||
// for non empty WAL
|
||||
Assert.assertEquals("Replicated batches are not correct", 2, replicateCount.get());
|
||||
verifyNumberOfLogsInQueue(1, numRs);
|
||||
// we're now writing to the new wal
|
||||
// if everything works, the source should've stopped reading from the empty wal, and start
|
||||
// replicating from the new wal
|
||||
runSimplePutDeleteTest();
|
||||
rollWalsAndWaitForDeque(numRs);
|
||||
}
|
||||
|
||||
// inject our empty wal into the replication queue, and then roll the original wal, which
|
||||
// enqueues a new wal behind our empty wal. We must roll the wal here as now we use the WAL to
|
||||
// determine if the file being replicated currently is still opened for write, so just inject a
|
||||
// new wal to the replication queue does not mean the previous file is closed.
|
||||
private void injectEmptyWAL(int numRs, List<Path> emptyWalPaths) throws IOException {
|
||||
for (int i = 0; i < numRs; i++) {
|
||||
HRegionServer hrs = UTIL1.getHBaseCluster().getRegionServer(i);
|
||||
Replication replicationService = (Replication) hrs.getReplicationSourceService();
|
||||
|
@ -111,13 +338,32 @@ public class TestReplicationEmptyWALRecovery extends TestReplicationBase {
|
|||
WAL wal = hrs.getWAL(regionInfo);
|
||||
wal.rollWriter(true);
|
||||
}
|
||||
}
|
||||
|
||||
// ReplicationSource should advance past the empty wal, or else the test will fail
|
||||
protected WALKeyImpl getWalKeyImpl() {
|
||||
return new WALKeyImpl(info.getEncodedNameAsBytes(), tableName, 0, mvcc, scopes);
|
||||
}
|
||||
|
||||
// Roll the WAL and wait for it to get deque from the log queue
|
||||
private void rollWalsAndWaitForDeque(int numRs) throws IOException {
|
||||
RegionInfo regionInfo =
|
||||
UTIL1.getHBaseCluster().getRegions(tableName.getName()).get(0).getRegionInfo();
|
||||
for (int i = 0; i < numRs; i++) {
|
||||
WAL wal = UTIL1.getHBaseCluster().getRegionServer(i).getWAL(regionInfo);
|
||||
wal.rollWriter();
|
||||
}
|
||||
waitForLogAdvance(numRs);
|
||||
}
|
||||
|
||||
// we're now writing to the new wal
|
||||
// if everything works, the source should've stopped reading from the empty wal, and start
|
||||
// replicating from the new wal
|
||||
runSimplePutDeleteTest();
|
||||
private void appendEntriesToWal(int numEntries, WAL wal) throws IOException {
|
||||
long txId = -1;
|
||||
for (int i = 0; i < numEntries; i++) {
|
||||
byte[] b = Bytes.toBytes(Integer.toString(i));
|
||||
KeyValue kv = new KeyValue(b, famName, b);
|
||||
WALEdit edit = new WALEdit();
|
||||
edit.add(kv);
|
||||
txId = wal.appendData(info, getWalKeyImpl(), edit);
|
||||
}
|
||||
wal.sync(txId);
|
||||
}
|
||||
}
|
||||
|
|
|
@ -27,9 +27,9 @@ import static org.junit.Assert.assertTrue;
|
|||
import static org.mockito.Mockito.doNothing;
|
||||
import static org.mockito.Mockito.mock;
|
||||
import static org.mockito.Mockito.when;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.util.ArrayList;
|
||||
import java.util.Arrays;
|
||||
import java.util.HashMap;
|
||||
import java.util.Map;
|
||||
import java.util.NavigableMap;
|
||||
|
@ -69,6 +69,7 @@ import org.apache.hadoop.hbase.wal.WAL.Entry;
|
|||
import org.apache.hadoop.hbase.wal.WALEdit;
|
||||
import org.apache.hadoop.hbase.wal.WALFactory;
|
||||
import org.apache.hadoop.hbase.wal.WALKeyImpl;
|
||||
import org.apache.hadoop.hbase.wal.WALProvider;
|
||||
import org.apache.hadoop.hdfs.MiniDFSCluster;
|
||||
import org.junit.After;
|
||||
import org.junit.AfterClass;
|
||||
|
@ -83,7 +84,6 @@ import org.junit.rules.TestName;
|
|||
import org.mockito.Mockito;
|
||||
import org.apache.hadoop.hbase.shaded.protobuf.generated.WALProtos;
|
||||
|
||||
|
||||
@Category({ ReplicationTests.class, LargeTests.class })
|
||||
public class TestWALEntryStream {
|
||||
|
||||
|
@ -687,6 +687,7 @@ public class TestWALEntryStream {
|
|||
// Override the max retries multiplier to fail fast.
|
||||
conf.setInt("replication.source.maxretriesmultiplier", 1);
|
||||
conf.setBoolean("replication.source.eof.autorecovery", true);
|
||||
conf.setInt("replication.source.nb.batches", 10);
|
||||
// Create a reader thread with source as recovered source.
|
||||
ReplicationSource source = mockReplicationSource(true, conf);
|
||||
when(source.isPeerEnabled()).thenReturn(true);
|
||||
|
@ -705,7 +706,64 @@ public class TestWALEntryStream {
|
|||
assertEquals(0, localLogQueue.getQueueSize(fakeWalGroupId));
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testEOFExceptionForRecoveredQueueWithMultipleLogs() throws Exception {
|
||||
Configuration conf = new Configuration(CONF);
|
||||
MetricsSource metrics = mock(MetricsSource.class);
|
||||
ReplicationSource source = mockReplicationSource(true, conf);
|
||||
ReplicationSourceLogQueue localLogQueue = new ReplicationSourceLogQueue(conf, metrics, source);
|
||||
// Create a 0 length log.
|
||||
Path emptyLog = new Path(fs.getHomeDirectory(),"log.2");
|
||||
FSDataOutputStream fsdos = fs.create(emptyLog);
|
||||
fsdos.close();
|
||||
assertEquals(0, fs.getFileStatus(emptyLog).getLen());
|
||||
localLogQueue.enqueueLog(emptyLog, fakeWalGroupId);
|
||||
|
||||
final Path log1 = new Path(fs.getHomeDirectory(), "log.1");
|
||||
WALProvider.Writer writer1 = WALFactory.createWALWriter(fs, log1, TEST_UTIL.getConfiguration());
|
||||
appendEntries(writer1, 3);
|
||||
localLogQueue.enqueueLog(log1, fakeWalGroupId);
|
||||
|
||||
ReplicationSourceManager mockSourceManager = mock(ReplicationSourceManager.class);
|
||||
// Make it look like the source is from recovered source.
|
||||
when(mockSourceManager.getOldSources())
|
||||
.thenReturn(new ArrayList<>(Arrays.asList((ReplicationSourceInterface)source)));
|
||||
when(source.isPeerEnabled()).thenReturn(true);
|
||||
when(mockSourceManager.getTotalBufferUsed()).thenReturn(new AtomicLong(0));
|
||||
// Override the max retries multiplier to fail fast.
|
||||
conf.setInt("replication.source.maxretriesmultiplier", 1);
|
||||
conf.setBoolean("replication.source.eof.autorecovery", true);
|
||||
conf.setInt("replication.source.nb.batches", 10);
|
||||
// Create a reader thread.
|
||||
ReplicationSourceWALReader reader =
|
||||
new ReplicationSourceWALReader(fs, conf, localLogQueue, 0,
|
||||
getDummyFilter(), source, fakeWalGroupId);
|
||||
assertEquals("Initial log queue size is not correct",
|
||||
2, localLogQueue.getQueueSize(fakeWalGroupId));
|
||||
reader.run();
|
||||
|
||||
// remove empty log from logQueue.
|
||||
assertEquals(0, localLogQueue.getQueueSize(fakeWalGroupId));
|
||||
assertEquals("Log queue should be empty", 0, localLogQueue.getQueueSize(fakeWalGroupId));
|
||||
}
|
||||
|
||||
private PriorityBlockingQueue<Path> getQueue() {
|
||||
return logQueue.getQueue(fakeWalGroupId);
|
||||
}
|
||||
|
||||
private void appendEntries(WALProvider.Writer writer, int numEntries) throws IOException {
|
||||
for (int i = 0; i < numEntries; i++) {
|
||||
byte[] b = Bytes.toBytes(Integer.toString(i));
|
||||
KeyValue kv = new KeyValue(b,b,b);
|
||||
WALEdit edit = new WALEdit();
|
||||
edit.add(kv);
|
||||
WALKeyImpl key = new WALKeyImpl(b, TableName.valueOf(b), 0, 0,
|
||||
HConstants.DEFAULT_CLUSTER_ID);
|
||||
NavigableMap<byte[], Integer> scopes = new TreeMap<byte[], Integer>(Bytes.BYTES_COMPARATOR);
|
||||
scopes.put(b, HConstants.REPLICATION_SCOPE_GLOBAL);
|
||||
writer.append(new WAL.Entry(key, edit));
|
||||
writer.sync(false);
|
||||
}
|
||||
writer.close();
|
||||
}
|
||||
}
|
||||
|
|
Loading…
Reference in New Issue