HBASE-25596: Fix NPE and avoid permanent unreplicated data due to EOF (#2987)

Signed-off-by: Xu Cang <xucang@apache.org>
Signed-off-by: Andrew Purtell <apurtell@apache.org>
This commit is contained in:
Sandeep Pal 2021-02-25 13:36:11 -08:00 committed by GitHub
parent a7d0445a21
commit 3f1c486ddb
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
8 changed files with 473 additions and 82 deletions

View File

@ -30,6 +30,7 @@ import java.util.Set;
import java.util.TreeMap; import java.util.TreeMap;
import java.util.UUID; import java.util.UUID;
import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.PriorityBlockingQueue;
import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException; import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicBoolean;
@ -65,7 +66,6 @@ import org.apache.hadoop.hbase.wal.WAL.Entry;
import org.apache.yetus.audience.InterfaceAudience; import org.apache.yetus.audience.InterfaceAudience;
import org.slf4j.Logger; import org.slf4j.Logger;
import org.slf4j.LoggerFactory; import org.slf4j.LoggerFactory;
import org.apache.hbase.thirdparty.com.google.common.collect.Lists; import org.apache.hbase.thirdparty.com.google.common.collect.Lists;
/** /**
@ -260,6 +260,11 @@ public class ReplicationSource implements ReplicationSourceInterface {
} }
} }
@InterfaceAudience.Private
public Map<String, PriorityBlockingQueue<Path>> getQueues() {
return logQueue.getQueues();
}
@Override @Override
public void addHFileRefs(TableName tableName, byte[] family, List<Pair<Path, Path>> pairs) public void addHFileRefs(TableName tableName, byte[] family, List<Pair<Path, Path>> pairs)
throws ReplicationException { throws ReplicationException {

View File

@ -41,7 +41,6 @@ import org.apache.yetus.audience.InterfaceAudience;
import org.apache.yetus.audience.InterfaceStability; import org.apache.yetus.audience.InterfaceStability;
import org.slf4j.Logger; import org.slf4j.Logger;
import org.slf4j.LoggerFactory; import org.slf4j.LoggerFactory;
import org.apache.hadoop.hbase.shaded.protobuf.generated.WALProtos.BulkLoadDescriptor; import org.apache.hadoop.hbase.shaded.protobuf.generated.WALProtos.BulkLoadDescriptor;
import org.apache.hadoop.hbase.shaded.protobuf.generated.WALProtos.StoreDescriptor; import org.apache.hadoop.hbase.shaded.protobuf.generated.WALProtos.StoreDescriptor;
@ -123,44 +122,64 @@ class ReplicationSourceWALReader extends Thread {
@Override @Override
public void run() { public void run() {
int sleepMultiplier = 1; int sleepMultiplier = 1;
while (isReaderRunning()) { // we only loop back here if something fatal happened to our stream WALEntryBatch batch = null;
try (WALEntryStream entryStream = WALEntryStream entryStream = null;
new WALEntryStream(logQueue, conf, currentPosition, try {
source.getWALFileLengthProvider(), source.getServerWALsBelongTo(), // we only loop back here if something fatal happened to our stream
source.getSourceMetrics(), walGroupId)) { while (isReaderRunning()) {
while (isReaderRunning()) { // loop here to keep reusing stream while we can try {
if (!source.isPeerEnabled()) { entryStream =
Threads.sleep(sleepForRetries); new WALEntryStream(logQueue, conf, currentPosition, source.getWALFileLengthProvider(),
continue; source.getServerWALsBelongTo(), source.getSourceMetrics(), walGroupId);
while (isReaderRunning()) { // loop here to keep reusing stream while we can
if (!source.isPeerEnabled()) {
Threads.sleep(sleepForRetries);
continue;
}
if (!checkQuota()) {
continue;
}
batch = createBatch(entryStream);
batch = readWALEntries(entryStream, batch);
currentPosition = entryStream.getPosition();
if (batch == null) {
// either the queue have no WAL to read
// or got no new entries (didn't advance position in WAL)
handleEmptyWALEntryBatch();
entryStream.reset(); // reuse stream
} else {
addBatchToShippingQueue(batch);
}
} }
if (!checkQuota()) { } catch (IOException e) { // stream related
continue; if (handleEofException(e, batch)) {
}
WALEntryBatch batch = readWALEntries(entryStream);
currentPosition = entryStream.getPosition();
if (batch != null) {
// need to propagate the batch even it has no entries since it may carry the last
// sequence id information for serial replication.
LOG.debug("Read {} WAL entries eligible for replication", batch.getNbEntries());
entryBatchQueue.put(batch);
sleepMultiplier = 1; sleepMultiplier = 1;
} else { // got no entries and didn't advance position in WAL } else {
handleEmptyWALEntryBatch(); LOG.warn("Failed to read stream of replication entries", e);
entryStream.reset(); // reuse stream if (sleepMultiplier < maxRetriesMultiplier) {
sleepMultiplier++;
}
Threads.sleep(sleepForRetries * sleepMultiplier);
} }
} catch (InterruptedException e) {
LOG.trace("Interrupted while sleeping between WAL reads");
Thread.currentThread().interrupt();
} finally {
entryStream.close();
} }
} catch (IOException e) { // stream related
if (!handleEofException(e)) {
LOG.warn("Failed to read stream of replication entries", e);
if (sleepMultiplier < maxRetriesMultiplier) {
sleepMultiplier ++;
}
Threads.sleep(sleepForRetries * sleepMultiplier);
}
} catch (InterruptedException e) {
LOG.trace("Interrupted while sleeping between WAL reads");
Thread.currentThread().interrupt();
} }
} catch (IOException e) {
if (sleepMultiplier < maxRetriesMultiplier) {
LOG.debug("Failed to read stream of replication entries: " + e);
sleepMultiplier++;
} else {
LOG.error("Failed to read stream of replication entries", e);
}
Threads.sleep(sleepForRetries * sleepMultiplier);
} catch (InterruptedException e) {
LOG.trace("Interrupted while sleeping between WAL reads");
Thread.currentThread().interrupt();
} }
} }
@ -189,14 +208,19 @@ class ReplicationSourceWALReader extends Thread {
return newPath == null || !path.getName().equals(newPath.getName()); return newPath == null || !path.getName().equals(newPath.getName());
} }
protected WALEntryBatch readWALEntries(WALEntryStream entryStream) // We need to get the WALEntryBatch from the caller so we can add entries in there
throws IOException, InterruptedException { // This is required in case there is any exception in while reading entries
// we do want to loss the existing entries in the batch
protected WALEntryBatch readWALEntries(WALEntryStream entryStream,
WALEntryBatch batch) throws IOException, InterruptedException {
Path currentPath = entryStream.getCurrentPath(); Path currentPath = entryStream.getCurrentPath();
if (!entryStream.hasNext()) { if (!entryStream.hasNext()) {
// check whether we have switched a file // check whether we have switched a file
if (currentPath != null && switched(entryStream, currentPath)) { if (currentPath != null && switched(entryStream, currentPath)) {
return WALEntryBatch.endOfFile(currentPath); return WALEntryBatch.endOfFile(currentPath);
} else { } else {
// This would mean either no more files in the queue
// or there is no new data yet on the current wal
return null; return null;
} }
} }
@ -208,7 +232,7 @@ class ReplicationSourceWALReader extends Thread {
// when reading from the entry stream first time we will enter here // when reading from the entry stream first time we will enter here
currentPath = entryStream.getCurrentPath(); currentPath = entryStream.getCurrentPath();
} }
WALEntryBatch batch = createBatch(entryStream); batch.setLastWalPath(currentPath);
for (;;) { for (;;) {
Entry entry = entryStream.next(); Entry entry = entryStream.next();
batch.setLastWalPosition(entryStream.getPosition()); batch.setLastWalPosition(entryStream.getPosition());
@ -236,6 +260,7 @@ class ReplicationSourceWALReader extends Thread {
if (logQueue.getQueue(walGroupId).isEmpty()) { if (logQueue.getQueue(walGroupId).isEmpty()) {
// we're done with current queue, either this is a recovered queue, or it is the special group // we're done with current queue, either this is a recovered queue, or it is the special group
// for a sync replication peer and the peer has been transited to DA or S state. // for a sync replication peer and the peer has been transited to DA or S state.
LOG.debug("Stopping the replication source wal reader");
setReaderRunning(false); setReaderRunning(false);
// shuts down shipper thread immediately // shuts down shipper thread immediately
entryBatchQueue.put(WALEntryBatch.NO_MORE_DATA); entryBatchQueue.put(WALEntryBatch.NO_MORE_DATA);
@ -245,22 +270,38 @@ class ReplicationSourceWALReader extends Thread {
} }
/** /**
* if we get an EOF due to a zero-length log, and there are other logs in queue * This is to handle the EOFException from the WAL entry stream. EOFException should
* (highly likely we've closed the current log), and autorecovery is * be handled carefully because there are chances of data loss because of never replicating
* enabled, then dump the log * the data. Thus we should always try to ship existing batch of entries here.
* If there was only one log in the queue before EOF, we ship the empty batch here
* and since reader is still active, in the next iteration of reader we will
* stop the reader.
* If there was more than one log in the queue before EOF, we ship the existing batch
* and reset the wal patch and position to the log with EOF, so shipper can remove
* logs from replication queue
* @return true only the IOE can be handled * @return true only the IOE can be handled
*/ */
private boolean handleEofException(IOException e) { private boolean handleEofException(IOException e, WALEntryBatch batch)
throws InterruptedException {
PriorityBlockingQueue<Path> queue = logQueue.getQueue(walGroupId); PriorityBlockingQueue<Path> queue = logQueue.getQueue(walGroupId);
// Dump the log even if logQueue size is 1 if the source is from recovered Source // Dump the log even if logQueue size is 1 if the source is from recovered Source
// since we don't add current log to recovered source queue so it is safe to remove. // since we don't add current log to recovered source queue so it is safe to remove.
if ((e instanceof EOFException || e.getCause() instanceof EOFException) && if ((e instanceof EOFException || e.getCause() instanceof EOFException)
(source.isRecovered() || queue.size() > 1) && this.eofAutoRecovery) { && (source.isRecovered() || queue.size() > 1)
&& this.eofAutoRecovery) {
Path head = queue.peek();
try { try {
if (fs.getFileStatus(queue.peek()).getLen() == 0) { if (fs.getFileStatus(head).getLen() == 0) {
LOG.warn("Forcing removal of 0 length log in queue: {}", queue.peek()); // head of the queue is an empty log file
LOG.warn("Forcing removal of 0 length log in queue: {}", head);
logQueue.remove(walGroupId); logQueue.remove(walGroupId);
currentPosition = 0; currentPosition = 0;
// After we removed the WAL from the queue, we should
// try shipping the existing batch of entries and set the wal position
// and path to the wal just dequeued to correctly remove logs from the zk
batch.setLastWalPath(head);
batch.setLastWalPosition(currentPosition);
addBatchToShippingQueue(batch);
return true; return true;
} }
} catch (IOException ioe) { } catch (IOException ioe) {
@ -270,6 +311,20 @@ class ReplicationSourceWALReader extends Thread {
return false; return false;
} }
/**
* Update the batch try to ship and return true if shipped
* @param batch Batch of entries to ship
* @throws InterruptedException throws interrupted exception
* @throws IOException throws io exception from stream
*/
private void addBatchToShippingQueue(WALEntryBatch batch)
throws InterruptedException, IOException {
// need to propagate the batch even it has no entries since it may carry the last
// sequence id information for serial replication.
LOG.debug("Read {} WAL entries eligible for replication", batch.getNbEntries());
entryBatchQueue.put(batch);
}
public Path getCurrentPath() { public Path getCurrentPath() {
// if we've read some WAL entries, get the Path we read from // if we've read some WAL entries, get the Path we read from
WALEntryBatch batchQueueHead = entryBatchQueue.peek(); WALEntryBatch batchQueueHead = entryBatchQueue.peek();

View File

@ -50,7 +50,7 @@ public class SerialReplicationSourceWALReader extends ReplicationSourceWALReader
} }
@Override @Override
protected WALEntryBatch readWALEntries(WALEntryStream entryStream) protected WALEntryBatch readWALEntries(WALEntryStream entryStream, WALEntryBatch batch)
throws IOException, InterruptedException { throws IOException, InterruptedException {
Path currentPath = entryStream.getCurrentPath(); Path currentPath = entryStream.getCurrentPath();
if (!entryStream.hasNext()) { if (!entryStream.hasNext()) {
@ -70,7 +70,7 @@ public class SerialReplicationSourceWALReader extends ReplicationSourceWALReader
currentPath = entryStream.getCurrentPath(); currentPath = entryStream.getCurrentPath();
} }
long positionBefore = entryStream.getPosition(); long positionBefore = entryStream.getPosition();
WALEntryBatch batch = createBatch(entryStream); batch = createBatch(entryStream);
for (;;) { for (;;) {
Entry entry = entryStream.peek(); Entry entry = entryStream.peek();
boolean doFiltering = true; boolean doFiltering = true;

View File

@ -94,6 +94,10 @@ class WALEntryBatch {
return lastWalPath; return lastWalPath;
} }
public void setLastWalPath(Path lastWalPath) {
this.lastWalPath = lastWalPath;
}
/** /**
* @return the position in the last WAL that was read. * @return the position in the last WAL that was read.
*/ */

View File

@ -80,7 +80,7 @@ class WALEntryStream implements Closeable {
* @param walFileLengthProvider provides the length of the WAL file * @param walFileLengthProvider provides the length of the WAL file
* @param serverName the server name which all WALs belong to * @param serverName the server name which all WALs belong to
* @param metrics the replication metrics * @param metrics the replication metrics
* @throws IOException * @throws IOException throw IO exception from stream
*/ */
public WALEntryStream(ReplicationSourceLogQueue logQueue, Configuration conf, public WALEntryStream(ReplicationSourceLogQueue logQueue, Configuration conf,
long startPosition, WALFileLengthProvider walFileLengthProvider, ServerName serverName, long startPosition, WALFileLengthProvider walFileLengthProvider, ServerName serverName,
@ -368,7 +368,9 @@ class WALEntryStream implements Closeable {
handleFileNotFound(path, fnfe); handleFileNotFound(path, fnfe);
} catch (RemoteException re) { } catch (RemoteException re) {
IOException ioe = re.unwrapRemoteException(FileNotFoundException.class); IOException ioe = re.unwrapRemoteException(FileNotFoundException.class);
if (!(ioe instanceof FileNotFoundException)) throw ioe; if (!(ioe instanceof FileNotFoundException)) {
throw ioe;
}
handleFileNotFound(path, (FileNotFoundException)ioe); handleFileNotFound(path, (FileNotFoundException)ioe);
} catch (LeaseNotRecoveredException lnre) { } catch (LeaseNotRecoveredException lnre) {
// HBASE-15019 the WAL was not closed due to some hiccup. // HBASE-15019 the WAL was not closed due to some hiccup.

View File

@ -20,10 +20,10 @@ package org.apache.hadoop.hbase.replication;
import static org.junit.Assert.assertArrayEquals; import static org.junit.Assert.assertArrayEquals;
import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertEquals;
import static org.junit.Assert.fail; import static org.junit.Assert.fail;
import java.io.IOException; import java.io.IOException;
import java.util.ArrayList; import java.util.ArrayList;
import java.util.List; import java.util.List;
import java.util.concurrent.atomic.AtomicInteger;
import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path; import org.apache.hadoop.fs.Path;
@ -44,8 +44,10 @@ import org.apache.hadoop.hbase.client.Scan;
import org.apache.hadoop.hbase.client.Table; import org.apache.hadoop.hbase.client.Table;
import org.apache.hadoop.hbase.client.TableDescriptor; import org.apache.hadoop.hbase.client.TableDescriptor;
import org.apache.hadoop.hbase.client.TableDescriptorBuilder; import org.apache.hadoop.hbase.client.TableDescriptorBuilder;
import org.apache.hadoop.hbase.replication.regionserver.HBaseInterClusterReplicationEndpoint;
import org.apache.hadoop.hbase.util.Bytes; import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.hbase.util.JVMClusterUtil; import org.apache.hadoop.hbase.util.JVMClusterUtil;
import org.apache.hadoop.hbase.wal.WAL;
import org.apache.hadoop.hbase.zookeeper.MiniZooKeeperCluster; import org.apache.hadoop.hbase.zookeeper.MiniZooKeeperCluster;
import org.junit.After; import org.junit.After;
import org.junit.AfterClass; import org.junit.AfterClass;
@ -53,9 +55,9 @@ import org.junit.Before;
import org.junit.BeforeClass; import org.junit.BeforeClass;
import org.slf4j.Logger; import org.slf4j.Logger;
import org.slf4j.LoggerFactory; import org.slf4j.LoggerFactory;
import org.apache.hbase.thirdparty.com.google.common.collect.ImmutableList; import org.apache.hbase.thirdparty.com.google.common.collect.ImmutableList;
import org.apache.hbase.thirdparty.com.google.common.collect.ImmutableMap; import org.apache.hbase.thirdparty.com.google.common.collect.ImmutableMap;
import org.apache.hbase.thirdparty.com.google.common.collect.Lists;
import org.apache.hbase.thirdparty.com.google.common.io.Closeables; import org.apache.hbase.thirdparty.com.google.common.io.Closeables;
/** /**
@ -87,6 +89,8 @@ public class TestReplicationBase {
NB_ROWS_IN_BATCH * 10; NB_ROWS_IN_BATCH * 10;
protected static final long SLEEP_TIME = 500; protected static final long SLEEP_TIME = 500;
protected static final int NB_RETRIES = 50; protected static final int NB_RETRIES = 50;
protected static AtomicInteger replicateCount = new AtomicInteger();
protected static volatile List<WAL.Entry> replicatedEntries = Lists.newArrayList();
protected static final TableName tableName = TableName.valueOf("test"); protected static final TableName tableName = TableName.valueOf("test");
protected static final byte[] famName = Bytes.toBytes("f"); protected static final byte[] famName = Bytes.toBytes("f");
@ -281,7 +285,8 @@ public class TestReplicationBase {
public void setUpBase() throws Exception { public void setUpBase() throws Exception {
if (!peerExist(PEER_ID2)) { if (!peerExist(PEER_ID2)) {
ReplicationPeerConfigBuilder builder = ReplicationPeerConfig.newBuilder() ReplicationPeerConfigBuilder builder = ReplicationPeerConfig.newBuilder()
.setClusterKey(UTIL2.getClusterKey()).setSerial(isSerialPeer()); .setClusterKey(UTIL2.getClusterKey()).setSerial(isSerialPeer()).setReplicationEndpointImpl(
ReplicationEndpointTest.class.getName());
if (isSyncPeer()) { if (isSyncPeer()) {
FileSystem fs2 = UTIL2.getTestFileSystem(); FileSystem fs2 = UTIL2.getTestFileSystem();
// The remote wal dir is not important as we do not use it in DA state, here we only need to // The remote wal dir is not important as we do not use it in DA state, here we only need to
@ -378,4 +383,20 @@ public class TestReplicationBase {
UTIL2.shutdownMiniCluster(); UTIL2.shutdownMiniCluster();
UTIL1.shutdownMiniCluster(); UTIL1.shutdownMiniCluster();
} }
/**
* Custom replication endpoint to keep track of replication status for tests.
*/
public static class ReplicationEndpointTest extends HBaseInterClusterReplicationEndpoint {
public ReplicationEndpointTest() {
replicateCount.set(0);
}
@Override public boolean replicate(ReplicateContext replicateContext) {
replicateCount.incrementAndGet();
replicatedEntries.addAll(replicateContext.getEntries());
return super.replicate(replicateContext);
}
}
} }

View File

@ -6,9 +6,7 @@
* to you under the Apache License, Version 2.0 (the * to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance * "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at * with the License. You may obtain a copy of the License at
* * http://www.apache.org/licenses/LICENSE-2.0
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software * Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, * distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
@ -20,56 +18,99 @@ package org.apache.hadoop.hbase.replication;
import java.io.IOException; import java.io.IOException;
import java.util.ArrayList; import java.util.ArrayList;
import java.util.List; import java.util.List;
import java.util.NavigableMap;
import java.util.TreeMap;
import org.apache.hadoop.fs.Path; import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hbase.HBaseClassTestRule; import org.apache.hadoop.hbase.HBaseClassTestRule;
import org.apache.hadoop.hbase.HConstants;
import org.apache.hadoop.hbase.KeyValue;
import org.apache.hadoop.hbase.Waiter; import org.apache.hadoop.hbase.Waiter;
import org.apache.hadoop.hbase.client.RegionInfo; import org.apache.hadoop.hbase.client.RegionInfo;
import org.apache.hadoop.hbase.client.RegionInfoBuilder;
import org.apache.hadoop.hbase.regionserver.HRegionServer; import org.apache.hadoop.hbase.regionserver.HRegionServer;
import org.apache.hadoop.hbase.regionserver.MultiVersionConcurrencyControl;
import org.apache.hadoop.hbase.regionserver.wal.AbstractFSWAL; import org.apache.hadoop.hbase.regionserver.wal.AbstractFSWAL;
import org.apache.hadoop.hbase.replication.regionserver.Replication; import org.apache.hadoop.hbase.replication.regionserver.Replication;
import org.apache.hadoop.hbase.replication.regionserver.ReplicationSource; import org.apache.hadoop.hbase.replication.regionserver.ReplicationSource;
import org.apache.hadoop.hbase.replication.regionserver.ReplicationSourceInterface; import org.apache.hadoop.hbase.replication.regionserver.ReplicationSourceInterface;
import org.apache.hadoop.hbase.testclassification.LargeTests; import org.apache.hadoop.hbase.testclassification.LargeTests;
import org.apache.hadoop.hbase.testclassification.ReplicationTests; import org.apache.hadoop.hbase.testclassification.ReplicationTests;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.hbase.wal.AbstractFSWALProvider; import org.apache.hadoop.hbase.wal.AbstractFSWALProvider;
import org.apache.hadoop.hbase.wal.WAL; import org.apache.hadoop.hbase.wal.WAL;
import org.apache.hadoop.hbase.wal.WALEdit;
import org.apache.hadoop.hbase.wal.WALKeyImpl;
import org.junit.Assert;
import org.junit.Before; import org.junit.Before;
import org.junit.ClassRule; import org.junit.ClassRule;
import org.junit.Test; import org.junit.Test;
import org.junit.experimental.categories.Category; import org.junit.experimental.categories.Category;
@Category({ ReplicationTests.class, LargeTests.class }) @Category
public class TestReplicationEmptyWALRecovery extends TestReplicationBase { ({ ReplicationTests.class, LargeTests.class }) public class TestReplicationEmptyWALRecovery
extends TestReplicationBase {
MultiVersionConcurrencyControl mvcc = new MultiVersionConcurrencyControl();
static final RegionInfo info = RegionInfoBuilder.newBuilder(tableName).build();
NavigableMap<byte[], Integer> scopes = new TreeMap<>(Bytes.BYTES_COMPARATOR);
@ClassRule @ClassRule public static final HBaseClassTestRule CLASS_RULE =
public static final HBaseClassTestRule CLASS_RULE = HBaseClassTestRule.forClass(TestReplicationEmptyWALRecovery.class);
HBaseClassTestRule.forClass(TestReplicationEmptyWALRecovery.class);
@Before @Before
public void setUp() throws IOException, InterruptedException { public void setUp() throws IOException, InterruptedException {
cleanUp(); cleanUp();
scopes.put(famName, HConstants.REPLICATION_SCOPE_GLOBAL);
replicateCount.set(0);
replicatedEntries.clear();
} }
/** /**
* Waits until there is only one log(the current writing one) in the replication queue * Waits until there is only one log(the current writing one) in the replication queue
* @param numRs number of regionservers *
* @param numRs number of region servers
*/ */
private void waitForLogAdvance(int numRs) throws Exception { private void waitForLogAdvance(int numRs) {
Waiter.waitFor(CONF1, 10000, new Waiter.Predicate<Exception>() { Waiter.waitFor(CONF1, 100000, new Waiter.Predicate<Exception>() {
@Override @Override
public boolean evaluate() throws Exception { public boolean evaluate() throws Exception {
for (int i = 0; i < numRs; i++) { for (int i = 0; i < numRs; i++) {
HRegionServer hrs = UTIL1.getHBaseCluster().getRegionServer(i); HRegionServer hrs = UTIL1.getHBaseCluster().getRegionServer(i);
RegionInfo regionInfo = RegionInfo regionInfo =
UTIL1.getHBaseCluster().getRegions(htable1.getName()).get(0).getRegionInfo(); UTIL1.getHBaseCluster().getRegions(htable1.getName()).get(0).getRegionInfo();
WAL wal = hrs.getWAL(regionInfo); WAL wal = hrs.getWAL(regionInfo);
Path currentFile = ((AbstractFSWAL<?>) wal).getCurrentFileName(); Path currentFile = ((AbstractFSWAL<?>) wal).getCurrentFileName();
Replication replicationService = (Replication) UTIL1.getHBaseCluster() Replication replicationService =
.getRegionServer(i).getReplicationSourceService(); (Replication) UTIL1.getHBaseCluster().getRegionServer(i).getReplicationSourceService();
for (ReplicationSourceInterface rsi : replicationService.getReplicationManager() for (ReplicationSourceInterface rsi : replicationService.getReplicationManager()
.getSources()) { .getSources()) {
ReplicationSource source = (ReplicationSource) rsi; ReplicationSource source = (ReplicationSource) rsi;
if (!currentFile.equals(source.getCurrentPath())) { // We are making sure that there is only one log queue and that is for the
// current WAL of region server
String logPrefix = source.getQueues().keySet().stream().findFirst().get();
if (!currentFile.equals(source.getCurrentPath())
|| source.getQueues().keySet().size() != 1
|| source.getQueues().get(logPrefix).size() != 1) {
return false;
}
}
}
return true;
}
});
}
private void verifyNumberOfLogsInQueue(int numQueues, int numRs) {
Waiter.waitFor(CONF1, 10000, new Waiter.Predicate<Exception>() {
@Override
public boolean evaluate() {
for (int i = 0; i < numRs; i++) {
Replication replicationService =
(Replication) UTIL1.getHBaseCluster().getRegionServer(i).getReplicationSourceService();
for (ReplicationSourceInterface rsi : replicationService.getReplicationManager()
.getSources()) {
ReplicationSource source = (ReplicationSource) rsi;
String logPrefix = source.getQueues().keySet().stream().findFirst().get();
if (source.getQueues().get(logPrefix).size() != numQueues) {
return false; return false;
} }
} }
@ -82,13 +123,12 @@ public class TestReplicationEmptyWALRecovery extends TestReplicationBase {
@Test @Test
public void testEmptyWALRecovery() throws Exception { public void testEmptyWALRecovery() throws Exception {
final int numRs = UTIL1.getHBaseCluster().getRegionServerThreads().size(); final int numRs = UTIL1.getHBaseCluster().getRegionServerThreads().size();
// for each RS, create an empty wal with same walGroupId // for each RS, create an empty wal with same walGroupId
final List<Path> emptyWalPaths = new ArrayList<>(); final List<Path> emptyWalPaths = new ArrayList<>();
long ts = System.currentTimeMillis(); long ts = System.currentTimeMillis();
for (int i = 0; i < numRs; i++) { for (int i = 0; i < numRs; i++) {
RegionInfo regionInfo = RegionInfo regionInfo =
UTIL1.getHBaseCluster().getRegions(htable1.getName()).get(0).getRegionInfo(); UTIL1.getHBaseCluster().getRegions(htable1.getName()).get(0).getRegionInfo();
WAL wal = UTIL1.getHBaseCluster().getRegionServer(i).getWAL(regionInfo); WAL wal = UTIL1.getHBaseCluster().getRegionServer(i).getWAL(regionInfo);
Path currentWalPath = AbstractFSWALProvider.getCurrentFileName(wal); Path currentWalPath = AbstractFSWALProvider.getCurrentFileName(wal);
String walGroupId = AbstractFSWALProvider.getWALPrefixFromWALName(currentWalPath.getName()); String walGroupId = AbstractFSWALProvider.getWALPrefixFromWALName(currentWalPath.getName());
@ -97,10 +137,197 @@ public class TestReplicationEmptyWALRecovery extends TestReplicationBase {
emptyWalPaths.add(emptyWalPath); emptyWalPaths.add(emptyWalPath);
} }
// inject our empty wal into the replication queue, and then roll the original wal, which injectEmptyWAL(numRs, emptyWalPaths);
// enqueues a new wal behind our empty wal. We must roll the wal here as now we use the WAL to
// determine if the file being replicated currently is still opened for write, so just inject a // ReplicationSource should advance past the empty wal, or else the test will fail
// new wal to the replication queue does not mean the previous file is closed. waitForLogAdvance(numRs);
verifyNumberOfLogsInQueue(1, numRs);
// we're now writing to the new wal
// if everything works, the source should've stopped reading from the empty wal, and start
// replicating from the new wal
runSimplePutDeleteTest();
rollWalsAndWaitForDeque(numRs);
}
/**
* Test empty WAL along with non empty WALs in the same batch. This test is to make sure
* when we see the empty and handle the EOF exception, we are able to existing the previous
* batch of entries without loosing it. This test also tests the number of batches shipped
*
* @throws Exception throws any exception
*/
@Test
public void testReplicationOfEmptyWALFollowingNonEmptyWAL() throws Exception {
// Disable the replication peer to accumulate the non empty WAL followed by empty WAL
hbaseAdmin.disableReplicationPeer(PEER_ID2);
int numOfEntriesToReplicate = 20;
final int numRs = UTIL1.getHBaseCluster().getRegionServerThreads().size();
// for each RS, create an empty wal with same walGroupId
final List<Path> emptyWalPaths = new ArrayList<>();
long ts = System.currentTimeMillis();
for (int i = 0; i < numRs; i++) {
RegionInfo regionInfo =
UTIL1.getHBaseCluster().getRegions(tableName.getName()).get(0).getRegionInfo();
WAL wal = UTIL1.getHBaseCluster().getRegionServer(i).getWAL(regionInfo);
Path currentWalPath = AbstractFSWALProvider.getCurrentFileName(wal);
appendEntriesToWal(numOfEntriesToReplicate, wal);
wal.rollWriter();
String walGroupId = AbstractFSWALProvider.getWALPrefixFromWALName(currentWalPath.getName());
Path emptyWalPath = new Path(UTIL1.getDefaultRootDirPath(), walGroupId + "." + ts);
UTIL1.getTestFileSystem().create(emptyWalPath).close();
emptyWalPaths.add(emptyWalPath);
}
injectEmptyWAL(numRs, emptyWalPaths);
// There should be three WALs in queue
// 1. empty WAL
// 2. non empty WAL
// 3. live WAL
//verifyNumberOfLogsInQueue(3, numRs);
hbaseAdmin.enableReplicationPeer(PEER_ID2);
// ReplicationSource should advance past the empty wal, or else the test will fail
waitForLogAdvance(numRs);
// Now we should expect numOfEntriesToReplicate entries
// replicated from each region server. This makes sure we didn't loose data
// from any previous batch when we encounter EOF exception for empty file.
Assert.assertEquals("Replicated entries are not correct", numOfEntriesToReplicate * numRs,
replicatedEntries.size());
// We expect just one batch of replication which will
// be from when we handle the EOF exception.
Assert.assertEquals("Replicated batches are not correct", 1, replicateCount.intValue());
verifyNumberOfLogsInQueue(1, numRs);
// we're now writing to the new wal
// if everything works, the source should've stopped reading from the empty wal, and start
// replicating from the new wal
runSimplePutDeleteTest();
rollWalsAndWaitForDeque(numRs);
}
/**
* Test empty WAL along with non empty WALs in the same batch. This test is to make sure
* when we see the empty WAL and handle the EOF exception, we are able to proceed
* with next batch and replicate it properly without missing data.
*
* @throws Exception throws any exception
*/
@Test
public void testReplicationOfEmptyWALFollowedByNonEmptyWAL() throws Exception {
// Disable the replication peer to accumulate the non empty WAL followed by empty WAL
hbaseAdmin.disableReplicationPeer(PEER_ID2);
int numOfEntriesToReplicate = 20;
final int numRs = UTIL1.getHBaseCluster().getRegionServerThreads().size();
// for each RS, create an empty wal with same walGroupId
final List<Path> emptyWalPaths = new ArrayList<>();
long ts = System.currentTimeMillis();
WAL wal = null;
for (int i = 0; i < numRs; i++) {
RegionInfo regionInfo =
UTIL1.getHBaseCluster().getRegions(tableName.getName()).get(0).getRegionInfo();
wal = UTIL1.getHBaseCluster().getRegionServer(i).getWAL(regionInfo);
Path currentWalPath = AbstractFSWALProvider.getCurrentFileName(wal);
appendEntriesToWal(numOfEntriesToReplicate, wal);
String walGroupId = AbstractFSWALProvider.getWALPrefixFromWALName(currentWalPath.getName());
Path emptyWalPath = new Path(UTIL1.getDataTestDir(), walGroupId + "." + ts);
UTIL1.getTestFileSystem().create(emptyWalPath).close();
emptyWalPaths.add(emptyWalPath);
}
injectEmptyWAL(numRs, emptyWalPaths);
// roll the WAL now
for (int i = 0; i < numRs; i++) {
wal.rollWriter();
}
hbaseAdmin.enableReplicationPeer(PEER_ID2);
// ReplicationSource should advance past the empty wal, or else the test will fail
waitForLogAdvance(numRs);
// Now we should expect numOfEntriesToReplicate entries
// replicated from each region server. This makes sure we didn't loose data
// from any previous batch when we encounter EOF exception for empty file.
Assert.assertEquals("Replicated entries are not correct", numOfEntriesToReplicate * numRs,
replicatedEntries.size());
// We expect just one batch of replication to be shipped which will
// for non empty WAL
Assert.assertEquals("Replicated batches are not correct", 1, replicateCount.get());
verifyNumberOfLogsInQueue(1, numRs);
// we're now writing to the new wal
// if everything works, the source should've stopped reading from the empty wal, and start
// replicating from the new wal
runSimplePutDeleteTest();
rollWalsAndWaitForDeque(numRs);
}
/**
* This test make sure we replicate all the enties from the non empty WALs which
* are surrounding the empty WALs
*
* @throws Exception throws exception
*/
@Test
public void testReplicationOfEmptyWALSurroundedNonEmptyWAL() throws Exception {
// Disable the replication peer to accumulate the non empty WAL followed by empty WAL
hbaseAdmin.disableReplicationPeer(PEER_ID2);
int numOfEntriesToReplicate = 20;
final int numRs = UTIL1.getHBaseCluster().getRegionServerThreads().size();
// for each RS, create an empty wal with same walGroupId
final List<Path> emptyWalPaths = new ArrayList<>();
long ts = System.currentTimeMillis();
WAL wal = null;
for (int i = 0; i < numRs; i++) {
RegionInfo regionInfo =
UTIL1.getHBaseCluster().getRegions(tableName.getName()).get(0).getRegionInfo();
wal = UTIL1.getHBaseCluster().getRegionServer(i).getWAL(regionInfo);
Path currentWalPath = AbstractFSWALProvider.getCurrentFileName(wal);
appendEntriesToWal(numOfEntriesToReplicate, wal);
wal.rollWriter();
String walGroupId = AbstractFSWALProvider.getWALPrefixFromWALName(currentWalPath.getName());
Path emptyWalPath = new Path(UTIL1.getDataTestDir(), walGroupId + "." + ts);
UTIL1.getTestFileSystem().create(emptyWalPath).close();
emptyWalPaths.add(emptyWalPath);
}
injectEmptyWAL(numRs, emptyWalPaths);
// roll the WAL again with some entries
for (int i = 0; i < numRs; i++) {
appendEntriesToWal(numOfEntriesToReplicate, wal);
wal.rollWriter();
}
hbaseAdmin.enableReplicationPeer(PEER_ID2);
// ReplicationSource should advance past the empty wal, or else the test will fail
waitForLogAdvance(numRs);
// Now we should expect numOfEntriesToReplicate entries
// replicated from each region server. This makes sure we didn't loose data
// from any previous batch when we encounter EOF exception for empty file.
Assert.assertEquals("Replicated entries are not correct", numOfEntriesToReplicate * numRs * 2,
replicatedEntries.size());
// We expect two batch of replication to be shipped which will
// for non empty WAL
Assert.assertEquals("Replicated batches are not correct", 2, replicateCount.get());
verifyNumberOfLogsInQueue(1, numRs);
// we're now writing to the new wal
// if everything works, the source should've stopped reading from the empty wal, and start
// replicating from the new wal
runSimplePutDeleteTest();
rollWalsAndWaitForDeque(numRs);
}
// inject our empty wal into the replication queue, and then roll the original wal, which
// enqueues a new wal behind our empty wal. We must roll the wal here as now we use the WAL to
// determine if the file being replicated currently is still opened for write, so just inject a
// new wal to the replication queue does not mean the previous file is closed.
private void injectEmptyWAL(int numRs, List<Path> emptyWalPaths) throws IOException {
for (int i = 0; i < numRs; i++) { for (int i = 0; i < numRs; i++) {
HRegionServer hrs = UTIL1.getHBaseCluster().getRegionServer(i); HRegionServer hrs = UTIL1.getHBaseCluster().getRegionServer(i);
Replication replicationService = (Replication) hrs.getReplicationSourceService(); Replication replicationService = (Replication) hrs.getReplicationSourceService();
@ -111,13 +338,32 @@ public class TestReplicationEmptyWALRecovery extends TestReplicationBase {
WAL wal = hrs.getWAL(regionInfo); WAL wal = hrs.getWAL(regionInfo);
wal.rollWriter(true); wal.rollWriter(true);
} }
}
// ReplicationSource should advance past the empty wal, or else the test will fail protected WALKeyImpl getWalKeyImpl() {
return new WALKeyImpl(info.getEncodedNameAsBytes(), tableName, 0, mvcc, scopes);
}
// Roll the WAL and wait for it to get deque from the log queue
private void rollWalsAndWaitForDeque(int numRs) throws IOException {
RegionInfo regionInfo =
UTIL1.getHBaseCluster().getRegions(tableName.getName()).get(0).getRegionInfo();
for (int i = 0; i < numRs; i++) {
WAL wal = UTIL1.getHBaseCluster().getRegionServer(i).getWAL(regionInfo);
wal.rollWriter();
}
waitForLogAdvance(numRs); waitForLogAdvance(numRs);
}
// we're now writing to the new wal private void appendEntriesToWal(int numEntries, WAL wal) throws IOException {
// if everything works, the source should've stopped reading from the empty wal, and start long txId = -1;
// replicating from the new wal for (int i = 0; i < numEntries; i++) {
runSimplePutDeleteTest(); byte[] b = Bytes.toBytes(Integer.toString(i));
KeyValue kv = new KeyValue(b, famName, b);
WALEdit edit = new WALEdit();
edit.add(kv);
txId = wal.appendData(info, getWalKeyImpl(), edit);
}
wal.sync(txId);
} }
} }

View File

@ -27,9 +27,9 @@ import static org.junit.Assert.assertTrue;
import static org.mockito.Mockito.doNothing; import static org.mockito.Mockito.doNothing;
import static org.mockito.Mockito.mock; import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.when; import static org.mockito.Mockito.when;
import java.io.IOException; import java.io.IOException;
import java.util.ArrayList; import java.util.ArrayList;
import java.util.Arrays;
import java.util.HashMap; import java.util.HashMap;
import java.util.Map; import java.util.Map;
import java.util.NavigableMap; import java.util.NavigableMap;
@ -69,6 +69,7 @@ import org.apache.hadoop.hbase.wal.WAL.Entry;
import org.apache.hadoop.hbase.wal.WALEdit; import org.apache.hadoop.hbase.wal.WALEdit;
import org.apache.hadoop.hbase.wal.WALFactory; import org.apache.hadoop.hbase.wal.WALFactory;
import org.apache.hadoop.hbase.wal.WALKeyImpl; import org.apache.hadoop.hbase.wal.WALKeyImpl;
import org.apache.hadoop.hbase.wal.WALProvider;
import org.apache.hadoop.hdfs.MiniDFSCluster; import org.apache.hadoop.hdfs.MiniDFSCluster;
import org.junit.After; import org.junit.After;
import org.junit.AfterClass; import org.junit.AfterClass;
@ -83,7 +84,6 @@ import org.junit.rules.TestName;
import org.mockito.Mockito; import org.mockito.Mockito;
import org.apache.hadoop.hbase.shaded.protobuf.generated.WALProtos; import org.apache.hadoop.hbase.shaded.protobuf.generated.WALProtos;
@Category({ ReplicationTests.class, LargeTests.class }) @Category({ ReplicationTests.class, LargeTests.class })
public class TestWALEntryStream { public class TestWALEntryStream {
@ -687,6 +687,7 @@ public class TestWALEntryStream {
// Override the max retries multiplier to fail fast. // Override the max retries multiplier to fail fast.
conf.setInt("replication.source.maxretriesmultiplier", 1); conf.setInt("replication.source.maxretriesmultiplier", 1);
conf.setBoolean("replication.source.eof.autorecovery", true); conf.setBoolean("replication.source.eof.autorecovery", true);
conf.setInt("replication.source.nb.batches", 10);
// Create a reader thread with source as recovered source. // Create a reader thread with source as recovered source.
ReplicationSource source = mockReplicationSource(true, conf); ReplicationSource source = mockReplicationSource(true, conf);
when(source.isPeerEnabled()).thenReturn(true); when(source.isPeerEnabled()).thenReturn(true);
@ -705,7 +706,64 @@ public class TestWALEntryStream {
assertEquals(0, localLogQueue.getQueueSize(fakeWalGroupId)); assertEquals(0, localLogQueue.getQueueSize(fakeWalGroupId));
} }
@Test
public void testEOFExceptionForRecoveredQueueWithMultipleLogs() throws Exception {
Configuration conf = new Configuration(CONF);
MetricsSource metrics = mock(MetricsSource.class);
ReplicationSource source = mockReplicationSource(true, conf);
ReplicationSourceLogQueue localLogQueue = new ReplicationSourceLogQueue(conf, metrics, source);
// Create a 0 length log.
Path emptyLog = new Path(fs.getHomeDirectory(),"log.2");
FSDataOutputStream fsdos = fs.create(emptyLog);
fsdos.close();
assertEquals(0, fs.getFileStatus(emptyLog).getLen());
localLogQueue.enqueueLog(emptyLog, fakeWalGroupId);
final Path log1 = new Path(fs.getHomeDirectory(), "log.1");
WALProvider.Writer writer1 = WALFactory.createWALWriter(fs, log1, TEST_UTIL.getConfiguration());
appendEntries(writer1, 3);
localLogQueue.enqueueLog(log1, fakeWalGroupId);
ReplicationSourceManager mockSourceManager = mock(ReplicationSourceManager.class);
// Make it look like the source is from recovered source.
when(mockSourceManager.getOldSources())
.thenReturn(new ArrayList<>(Arrays.asList((ReplicationSourceInterface)source)));
when(source.isPeerEnabled()).thenReturn(true);
when(mockSourceManager.getTotalBufferUsed()).thenReturn(new AtomicLong(0));
// Override the max retries multiplier to fail fast.
conf.setInt("replication.source.maxretriesmultiplier", 1);
conf.setBoolean("replication.source.eof.autorecovery", true);
conf.setInt("replication.source.nb.batches", 10);
// Create a reader thread.
ReplicationSourceWALReader reader =
new ReplicationSourceWALReader(fs, conf, localLogQueue, 0,
getDummyFilter(), source, fakeWalGroupId);
assertEquals("Initial log queue size is not correct",
2, localLogQueue.getQueueSize(fakeWalGroupId));
reader.run();
// remove empty log from logQueue.
assertEquals(0, localLogQueue.getQueueSize(fakeWalGroupId));
assertEquals("Log queue should be empty", 0, localLogQueue.getQueueSize(fakeWalGroupId));
}
private PriorityBlockingQueue<Path> getQueue() { private PriorityBlockingQueue<Path> getQueue() {
return logQueue.getQueue(fakeWalGroupId); return logQueue.getQueue(fakeWalGroupId);
} }
private void appendEntries(WALProvider.Writer writer, int numEntries) throws IOException {
for (int i = 0; i < numEntries; i++) {
byte[] b = Bytes.toBytes(Integer.toString(i));
KeyValue kv = new KeyValue(b,b,b);
WALEdit edit = new WALEdit();
edit.add(kv);
WALKeyImpl key = new WALKeyImpl(b, TableName.valueOf(b), 0, 0,
HConstants.DEFAULT_CLUSTER_ID);
NavigableMap<byte[], Integer> scopes = new TreeMap<byte[], Integer>(Bytes.BYTES_COMPARATOR);
scopes.put(b, HConstants.REPLICATION_SCOPE_GLOBAL);
writer.append(new WAL.Entry(key, edit));
writer.sync(false);
}
writer.close();
}
} }