HBASE-25596: Fix NPE and avoid permanent unreplicated data due to EOF (#2975)
Signed-off-by: Andrew Purtell <apurtell@apache.org>
This commit is contained in:
parent
d37f734299
commit
19fe18e466
|
@ -221,6 +221,11 @@ public class ReplicationSource extends Thread implements ReplicationSourceInterf
|
|||
}
|
||||
}
|
||||
|
||||
@InterfaceAudience.Private
|
||||
public Map<String, PriorityBlockingQueue<Path>> getQueues() {
|
||||
return logQueue.getQueues();
|
||||
}
|
||||
|
||||
@Override
|
||||
public void addHFileRefs(TableName tableName, byte[] family, List<Pair<Path, Path>> pairs)
|
||||
throws ReplicationException {
|
||||
|
|
|
@ -183,7 +183,7 @@ public class ReplicationSourceManager implements ReplicationListener {
|
|||
* position. It will also clean old logs from the queue.
|
||||
* @param log Path to the log currently being replicated from
|
||||
* replication status in zookeeper. It will also delete older entries.
|
||||
* @param id id of the peer cluster
|
||||
* @param id id of the replication queue
|
||||
* @param position current location in the log
|
||||
* @param queueRecovered indicates if this queue comes from another region server
|
||||
* @param holdLogInZK if true then the log is retained in ZK
|
||||
|
|
|
@ -50,7 +50,8 @@ import org.apache.hadoop.hbase.wal.WAL.Entry;
|
|||
import org.apache.hadoop.hbase.wal.WALKey;
|
||||
|
||||
/**
|
||||
* Reads and filters WAL entries, groups the filtered entries into batches, and puts the batches onto a queue
|
||||
* Reads and filters WAL entries, groups the filtered entries into batches,
|
||||
* and puts the batches onto a queue
|
||||
*
|
||||
*/
|
||||
@InterfaceAudience.Private
|
||||
|
@ -88,7 +89,7 @@ public class ReplicationSourceWALReaderThread extends Thread {
|
|||
* Creates a reader worker for a given WAL queue. Reads WAL entries off a given queue, batches the
|
||||
* entries, and puts them on a batch queue.
|
||||
* @param manager replication manager
|
||||
* @param replicationQueueInfo
|
||||
* @param replicationQueueInfo replication queue info
|
||||
* @param logQueue The WAL queue to read off of
|
||||
* @param startPosition position in the first WAL to start reading from
|
||||
* @param fs the files system to use
|
||||
|
@ -135,71 +136,128 @@ public class ReplicationSourceWALReaderThread extends Thread {
|
|||
@Override
|
||||
public void run() {
|
||||
int sleepMultiplier = 1;
|
||||
while (isReaderRunning()) { // we only loop back here if something fatal happened to our stream
|
||||
try (WALEntryStream entryStream =
|
||||
new WALEntryStream(logQueue, fs, conf, lastReadPosition, metrics, walGroupId)) {
|
||||
while (isReaderRunning()) { // loop here to keep reusing stream while we can
|
||||
if (!source.isPeerEnabled()) {
|
||||
Threads.sleep(sleepForRetries);
|
||||
continue;
|
||||
}
|
||||
if (!checkQuota()) {
|
||||
continue;
|
||||
}
|
||||
WALEntryBatch batch = new WALEntryBatch(replicationBatchCountCapacity);
|
||||
boolean hasNext;
|
||||
while ((hasNext = entryStream.hasNext()) == true) {
|
||||
Entry entry = entryStream.next();
|
||||
entry = filterEntry(entry);
|
||||
if (entry != null) {
|
||||
WALEdit edit = entry.getEdit();
|
||||
if (edit != null && !edit.isEmpty()) {
|
||||
long entrySize = getEntrySizeIncludeBulkLoad(entry);
|
||||
long entrySizeExcludeBulkLoad = getEntrySizeExcludeBulkLoad(entry);
|
||||
batch.addEntry(entry, entrySize);
|
||||
updateBatchStats(batch, entry, entryStream.getPosition(), entrySize);
|
||||
boolean totalBufferTooLarge = acquireBufferQuota(entrySizeExcludeBulkLoad);
|
||||
// Stop if too many entries or too big
|
||||
if (totalBufferTooLarge || batch.getHeapSize() >= replicationBatchSizeCapacity
|
||||
WALEntryBatch batch = null;
|
||||
WALEntryStream entryStream =
|
||||
new WALEntryStream(logQueue, fs, conf, lastReadPosition, metrics, walGroupId);
|
||||
try {
|
||||
while (isReaderRunning()) { // we only loop back here if something fatal happens to stream
|
||||
try {
|
||||
entryStream = new WALEntryStream(logQueue, fs, conf,
|
||||
lastReadPosition, metrics, walGroupId);
|
||||
while (isReaderRunning()) { // loop here to keep reusing stream while we can
|
||||
if (!source.isPeerEnabled()) {
|
||||
Threads.sleep(sleepForRetries);
|
||||
continue;
|
||||
}
|
||||
if (!checkQuota()) {
|
||||
continue;
|
||||
}
|
||||
batch = new WALEntryBatch(replicationBatchCountCapacity);
|
||||
boolean hasNext = entryStream.hasNext();
|
||||
while (hasNext) {
|
||||
Entry entry = entryStream.next();
|
||||
entry = filterEntry(entry);
|
||||
if (entry != null) {
|
||||
WALEdit edit = entry.getEdit();
|
||||
if (edit != null && !edit.isEmpty()) {
|
||||
long entrySize = getEntrySizeIncludeBulkLoad(entry);
|
||||
long entrySizeExcludeBulkLoad = getEntrySizeExcludeBulkLoad(entry);
|
||||
batch.addEntry(entry, entrySize);
|
||||
updateBatchStats(batch, entry, entryStream.getPosition(), entrySize);
|
||||
boolean totalBufferTooLarge = acquireBufferQuota(entrySizeExcludeBulkLoad);
|
||||
// Stop if too many entries or too big
|
||||
if (totalBufferTooLarge || batch.getHeapSize() >= replicationBatchSizeCapacity
|
||||
|| batch.getNbEntries() >= replicationBatchCountCapacity) {
|
||||
break;
|
||||
break;
|
||||
}
|
||||
}
|
||||
}
|
||||
hasNext = entryStream.hasNext();
|
||||
}
|
||||
}
|
||||
|
||||
updateBatch(entryStream, batch, hasNext);
|
||||
if (isShippable(batch)) {
|
||||
sleepMultiplier = 1;
|
||||
entryBatchQueue.put(batch);
|
||||
if (!batch.hasMoreEntries()) {
|
||||
// we're done with queue recovery, shut ourselves down
|
||||
setReaderRunning(false);
|
||||
// If the batch has data to max capacity or stream doesn't have anything
|
||||
// try to ship it
|
||||
if (updateBatchAndShippingQueue(entryStream, batch, hasNext, false)) {
|
||||
sleepMultiplier = 1;
|
||||
}
|
||||
} else {
|
||||
Thread.sleep(sleepForRetries);
|
||||
}
|
||||
resetStream(entryStream);
|
||||
} catch (IOException | WALEntryStreamRuntimeException e) { // stream related
|
||||
if (handleEofException(e, entryStream, batch)) {
|
||||
sleepMultiplier = 1;
|
||||
} else {
|
||||
if (sleepMultiplier < maxRetriesMultiplier) {
|
||||
LOG.debug("Failed to read stream of replication entries: " + e);
|
||||
sleepMultiplier++;
|
||||
} else {
|
||||
LOG.error("Failed to read stream of replication entries", e);
|
||||
}
|
||||
Threads.sleep(sleepForRetries * sleepMultiplier);
|
||||
}
|
||||
} catch (InterruptedException e) {
|
||||
LOG.trace("Interrupted while sleeping between WAL reads");
|
||||
Thread.currentThread().interrupt();
|
||||
} finally {
|
||||
entryStream.close();
|
||||
}
|
||||
} catch (IOException | WALEntryStreamRuntimeException e) { // stream related
|
||||
if (sleepMultiplier < maxRetriesMultiplier) {
|
||||
LOG.debug("Failed to read stream of replication entries: " + e);
|
||||
sleepMultiplier++;
|
||||
} else {
|
||||
LOG.error("Failed to read stream of replication entries", e);
|
||||
handleEofException(e);
|
||||
}
|
||||
Threads.sleep(sleepForRetries * sleepMultiplier);
|
||||
} catch (InterruptedException e) {
|
||||
LOG.trace("Interrupted while sleeping between WAL reads");
|
||||
Thread.currentThread().interrupt();
|
||||
}
|
||||
} catch (IOException e) {
|
||||
if (sleepMultiplier < maxRetriesMultiplier) {
|
||||
LOG.debug("Failed to read stream of replication entries: " + e);
|
||||
sleepMultiplier++;
|
||||
} else {
|
||||
LOG.error("Failed to read stream of replication entries", e);
|
||||
}
|
||||
Threads.sleep(sleepForRetries * sleepMultiplier);
|
||||
} catch (InterruptedException e) {
|
||||
LOG.trace("Interrupted while sleeping between WAL reads");
|
||||
Thread.currentThread().interrupt();
|
||||
}
|
||||
}
|
||||
|
||||
private void updateBatch(WALEntryStream entryStream, WALEntryBatch batch, boolean moreData) {
|
||||
/**
|
||||
* Update the batch try to ship and return true if shipped
|
||||
* @param entryStream stream of the WALs
|
||||
* @param batch Batch of entries to ship
|
||||
* @param hasMoreData if the stream has more yet more data to read
|
||||
* @param isEOFException if we have hit the EOF exception before this. For EOF exception,
|
||||
* we do not want to reset the stream since entry stream doesn't
|
||||
* have correct information.
|
||||
* @return if batch is shipped successfully
|
||||
* @throws InterruptedException throws interrupted exception
|
||||
* @throws IOException throws io exception from stream
|
||||
*/
|
||||
private boolean updateBatchAndShippingQueue(WALEntryStream entryStream, WALEntryBatch batch,
|
||||
boolean hasMoreData, boolean isEOFException) throws InterruptedException, IOException {
|
||||
updateBatch(entryStream, batch, hasMoreData, isEOFException);
|
||||
boolean isDataQueued = false;
|
||||
if (isShippable(batch)) {
|
||||
isDataQueued = true;
|
||||
entryBatchQueue.put(batch);
|
||||
if (!batch.hasMoreEntries()) {
|
||||
// we're done with queue recovery, shut ourselves down
|
||||
LOG.debug("Stopping the reader after recovering the queue");
|
||||
setReaderRunning(false);
|
||||
}
|
||||
} else {
|
||||
Thread.sleep(sleepForRetries);
|
||||
}
|
||||
|
||||
if (!isEOFException) {
|
||||
resetStream(entryStream);
|
||||
}
|
||||
return isDataQueued;
|
||||
}
|
||||
|
||||
private void updateBatch(WALEntryStream entryStream, WALEntryBatch batch, boolean moreData,
|
||||
boolean isEOFException) {
|
||||
logMessage(batch);
|
||||
batch.updatePosition(entryStream);
|
||||
// In case of EOF exception we can utilize the last read path and position
|
||||
// since we do not have the current information.
|
||||
if (isEOFException) {
|
||||
batch.updatePosition(lastReadPath, lastReadPosition);
|
||||
} else {
|
||||
batch.updatePosition(entryStream.getCurrentPath(), entryStream.getPosition());
|
||||
}
|
||||
batch.setMoreEntries(!replicationQueueInfo.isQueueRecovered() || moreData);
|
||||
}
|
||||
|
||||
|
@ -229,10 +287,18 @@ public class ReplicationSourceWALReaderThread extends Thread {
|
|||
stream.reset(); // reuse stream
|
||||
}
|
||||
|
||||
// if we get an EOF due to a zero-length log, and there are other logs in queue
|
||||
// (highly likely we've closed the current log), we've hit the max retries, and autorecovery is
|
||||
// enabled, then dump the log
|
||||
private void handleEofException(Exception e) {
|
||||
/**
|
||||
* This is to handle the EOFException from the WAL entry stream. EOFException should
|
||||
* be handled carefully because there are chances of data loss because of never replicating
|
||||
* the data.
|
||||
* If EOFException happens on the last log in recovered queue, we can safely stop
|
||||
* the reader.
|
||||
* If EOException doesn't happen on the last log in recovered queue, we should
|
||||
* not stop the reader.
|
||||
* @return true only the IOE can be handled
|
||||
*/
|
||||
private boolean handleEofException(Exception e, WALEntryStream entryStream,
|
||||
WALEntryBatch batch) throws InterruptedException {
|
||||
boolean isRecoveredSource = manager.getOldSources().contains(source);
|
||||
PriorityBlockingQueue<Path> queue = logQueue.getQueue(walGroupId);
|
||||
// Dump the log even if logQueue size is 1 if the source is from recovered Source since we don't
|
||||
|
@ -245,11 +311,22 @@ public class ReplicationSourceWALReaderThread extends Thread {
|
|||
lastReadPath = queue.peek();
|
||||
logQueue.remove(walGroupId);
|
||||
lastReadPosition = 0;
|
||||
|
||||
// If it was on last log in the recovered queue,
|
||||
// the stream doesn't have more data, we should stop the reader
|
||||
boolean hasMoreData = !queue.isEmpty();
|
||||
// After we removed the WAL from the queue, we should
|
||||
// try shipping the existing batch of entries, we do not want to reset
|
||||
// stream since entry stream doesn't have the correct data at this point
|
||||
updateBatchAndShippingQueue(entryStream, batch, hasMoreData, true);
|
||||
return true;
|
||||
}
|
||||
} catch (IOException ioe) {
|
||||
LOG.warn("Couldn't get file length information about log " + queue.peek());
|
||||
}
|
||||
}
|
||||
|
||||
return false;
|
||||
}
|
||||
|
||||
public Path getCurrentPath() {
|
||||
|
@ -299,7 +376,8 @@ public class ReplicationSourceWALReaderThread extends Thread {
|
|||
return edit.heapSize() + key.estimatedSerializedSizeOf();
|
||||
}
|
||||
|
||||
private void updateBatchStats(WALEntryBatch batch, Entry entry, long entryPosition, long entrySize) {
|
||||
private void updateBatchStats(WALEntryBatch batch, Entry entry,
|
||||
long entryPosition, long entrySize) {
|
||||
WALEdit edit = entry.getEdit();
|
||||
if (edit != null && !edit.isEmpty()) {
|
||||
batch.incrementHeapSize(entrySize);
|
||||
|
@ -409,7 +487,7 @@ public class ReplicationSourceWALReaderThread extends Thread {
|
|||
* Holds a batch of WAL entries to replicate, along with some statistics
|
||||
*
|
||||
*/
|
||||
static class WALEntryBatch {
|
||||
final static class WALEntryBatch {
|
||||
private List<Pair<Entry, Long>> walEntriesWithSize;
|
||||
// last WAL that was read
|
||||
private Path lastWalPath;
|
||||
|
@ -515,9 +593,15 @@ public class ReplicationSourceWALReaderThread extends Thread {
|
|||
return walEntriesWithSize.isEmpty();
|
||||
}
|
||||
|
||||
public void updatePosition(WALEntryStream entryStream) {
|
||||
lastWalPath = entryStream.getCurrentPath();
|
||||
lastWalPosition = entryStream.getPosition();
|
||||
/**
|
||||
* Update the wal entry batch with latest wal and position which will be used by
|
||||
* shipper to update the log position in ZK node
|
||||
* @param currentPath the path of WAL
|
||||
* @param currentPosition the position of the WAL
|
||||
*/
|
||||
public void updatePosition(Path currentPath, long currentPosition) {
|
||||
lastWalPath = currentPath;
|
||||
lastWalPosition = currentPosition;
|
||||
}
|
||||
|
||||
public boolean hasMoreEntries() {
|
||||
|
|
|
@ -72,25 +72,23 @@ public class WALEntryStream implements Iterator<Entry>, Closeable, Iterable<Entr
|
|||
* @param conf {@link Configuration} to use to create {@link Reader} for this stream
|
||||
* @param metrics replication metrics
|
||||
* @param walGroupId wal prefix
|
||||
* @throws IOException
|
||||
*/
|
||||
public WALEntryStream(ReplicationSourceLogQueue logQueue, FileSystem fs, Configuration conf,
|
||||
MetricsSource metrics, String walGroupId)
|
||||
throws IOException {
|
||||
MetricsSource metrics, String walGroupId) {
|
||||
this(logQueue, fs, conf, 0, metrics, walGroupId);
|
||||
}
|
||||
|
||||
/**
|
||||
* Create an entry stream over the given queue at the given start position
|
||||
* @param logQueue the queue of WAL paths
|
||||
* @param fs {@link FileSystem} to use to create {@link Reader} for this stream
|
||||
* @param conf the {@link Configuration} to use to create {@link Reader} for this stream
|
||||
* @param startPosition the position in the first WAL to start reading at
|
||||
* @param metrics the replication metrics
|
||||
* @param walGroupId wal prefix
|
||||
* @throws IOException
|
||||
*/
|
||||
public WALEntryStream(ReplicationSourceLogQueue logQueue, FileSystem fs, Configuration conf,
|
||||
long startPosition, MetricsSource metrics, String walGroupId) throws IOException {
|
||||
long startPosition, MetricsSource metrics, String walGroupId) {
|
||||
this.logQueue = logQueue;
|
||||
this.fs = fs;
|
||||
this.conf = conf;
|
||||
|
@ -122,7 +120,9 @@ public class WALEntryStream implements Iterator<Entry>, Closeable, Iterable<Entr
|
|||
*/
|
||||
@Override
|
||||
public Entry next() {
|
||||
if (!hasNext()) throw new NoSuchElementException();
|
||||
if (!hasNext()) {
|
||||
throw new NoSuchElementException();
|
||||
}
|
||||
Entry save = currentEntry;
|
||||
currentEntry = null; // gets reloaded by hasNext()
|
||||
return save;
|
||||
|
@ -180,7 +180,7 @@ public class WALEntryStream implements Iterator<Entry>, Closeable, Iterable<Entr
|
|||
/**
|
||||
* Should be called if the stream is to be reused (i.e. used again after hasNext() has returned
|
||||
* false)
|
||||
* @throws IOException
|
||||
* @throws IOException io exception while resetting the reader
|
||||
*/
|
||||
public void reset() throws IOException {
|
||||
if (reader != null && currentPath != null) {
|
||||
|
@ -306,7 +306,9 @@ public class WALEntryStream implements Iterator<Entry>, Closeable, Iterable<Entr
|
|||
Path nextPath = queue.peek();
|
||||
if (nextPath != null) {
|
||||
openReader(nextPath);
|
||||
if (reader != null) return true;
|
||||
if (reader != null) {
|
||||
return true;
|
||||
}
|
||||
}
|
||||
return false;
|
||||
}
|
||||
|
@ -349,7 +351,9 @@ public class WALEntryStream implements Iterator<Entry>, Closeable, Iterable<Entr
|
|||
handleFileNotFound(path, fnfe);
|
||||
} catch (RemoteException re) {
|
||||
IOException ioe = re.unwrapRemoteException(FileNotFoundException.class);
|
||||
if (!(ioe instanceof FileNotFoundException)) throw ioe;
|
||||
if (!(ioe instanceof FileNotFoundException)) {
|
||||
throw ioe;
|
||||
}
|
||||
handleFileNotFound(path, (FileNotFoundException)ioe);
|
||||
} catch (LeaseNotRecoveredException lnre) {
|
||||
// HBASE-15019 the WAL was not closed due to some hiccup.
|
||||
|
|
|
@ -34,6 +34,8 @@ import static org.mockito.Mockito.verify;
|
|||
import static org.mockito.Mockito.when;
|
||||
import static org.mockito.internal.verification.VerificationModeFactory.times;
|
||||
|
||||
import com.google.common.collect.Lists;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.util.Collections;
|
||||
import java.util.HashMap;
|
||||
|
@ -46,10 +48,10 @@ import java.util.concurrent.ExecutorService;
|
|||
import java.util.concurrent.Executors;
|
||||
import java.util.concurrent.Future;
|
||||
import java.util.concurrent.atomic.AtomicLong;
|
||||
|
||||
import org.apache.commons.logging.Log;
|
||||
import org.apache.commons.logging.LogFactory;
|
||||
import org.apache.hadoop.conf.Configuration;
|
||||
import org.apache.hadoop.fs.FSDataOutputStream;
|
||||
import org.apache.hadoop.fs.FileSystem;
|
||||
import org.apache.hadoop.fs.Path;
|
||||
import org.apache.hadoop.hbase.CompatibilitySingletonFactory;
|
||||
|
@ -75,6 +77,7 @@ import org.apache.hadoop.hbase.replication.regionserver.MetricsReplicationSource
|
|||
import org.apache.hadoop.hbase.replication.regionserver.MetricsSource;
|
||||
import org.apache.hadoop.hbase.replication.regionserver.Replication;
|
||||
import org.apache.hadoop.hbase.replication.regionserver.ReplicationSource;
|
||||
import org.apache.hadoop.hbase.replication.regionserver.ReplicationSourceInterface;
|
||||
import org.apache.hadoop.hbase.replication.regionserver.ReplicationSourceManager;
|
||||
import org.apache.hadoop.hbase.testclassification.MediumTests;
|
||||
import org.apache.hadoop.hbase.util.ByteStringer;
|
||||
|
@ -86,6 +89,7 @@ import org.apache.hadoop.hbase.wal.WAL;
|
|||
import org.apache.hadoop.hbase.wal.WALFactory;
|
||||
import org.apache.hadoop.hbase.wal.WALKey;
|
||||
import org.apache.hadoop.hbase.wal.WALProvider;
|
||||
|
||||
import org.junit.After;
|
||||
import org.junit.AfterClass;
|
||||
import org.junit.Before;
|
||||
|
@ -125,6 +129,7 @@ public class TestReplicationSource {
|
|||
if (FS.exists(logDir)) {
|
||||
FS.delete(logDir, true);
|
||||
}
|
||||
conf.setBoolean("replication.source.eof.autorecovery", true);
|
||||
}
|
||||
|
||||
@Before
|
||||
|
@ -244,7 +249,6 @@ public class TestReplicationSource {
|
|||
}
|
||||
|
||||
});
|
||||
|
||||
}
|
||||
|
||||
private void appendEntries(WALProvider.Writer writer, int numEntries) throws IOException {
|
||||
|
@ -295,12 +299,16 @@ public class TestReplicationSource {
|
|||
Mockito.anyLong(), Mockito.anyBoolean(), Mockito.anyBoolean());
|
||||
}
|
||||
|
||||
ReplicationSource createReplicationSourceWithMocks(ReplicationEndpoint endpoint)
|
||||
throws IOException {
|
||||
ReplicationSource createReplicationSourceWithMocks(ReplicationEndpoint endpoint,
|
||||
boolean isRecovered) throws IOException {
|
||||
final ReplicationSource source = new ReplicationSource();
|
||||
endpoint.init(context);
|
||||
source.init(conf, FS, manager, queues, peers, mock(Stoppable.class),
|
||||
"testPeerClusterZnode", UUID.randomUUID(), endpoint, metrics);
|
||||
"testPeerClusterZnode", UUID.randomUUID(), endpoint, metrics);
|
||||
if (isRecovered) {
|
||||
when(manager.getOldSources())
|
||||
.thenReturn(Lists.<ReplicationSourceInterface>newArrayList(source));
|
||||
}
|
||||
return source;
|
||||
}
|
||||
|
||||
|
@ -321,48 +329,54 @@ public class TestReplicationSource {
|
|||
@Test
|
||||
public void testSetLogPositionForWALCurrentlyReadingWhenLogsRolled() throws Exception {
|
||||
final int numWALEntries = 5;
|
||||
conf.setInt("replication.source.nb.capacity", numWALEntries);
|
||||
int nbCapacity = conf.getInt("replication.source.nb.capacity", 25000);
|
||||
try {
|
||||
conf.setInt("replication.source.nb.capacity", numWALEntries);
|
||||
|
||||
Mocks mocks = new Mocks();
|
||||
final ReplicationEndpointForTest endpoint = new ReplicationEndpointForTest() {
|
||||
@Override
|
||||
public WALEntryFilter getWALEntryfilter() {
|
||||
return null;
|
||||
}
|
||||
};
|
||||
WALFactory wals = new WALFactory(TEST_UTIL.getConfiguration(), null, "test");
|
||||
final Path log1 = new Path(logDir, "log.1");
|
||||
final Path log2 = new Path(logDir, "log.2");
|
||||
Mocks mocks = new Mocks();
|
||||
final ReplicationEndpointForTest endpoint = new ReplicationEndpointForTest() {
|
||||
@Override public WALEntryFilter getWALEntryfilter() {
|
||||
return null;
|
||||
}
|
||||
};
|
||||
WALFactory wals = new WALFactory(TEST_UTIL.getConfiguration(), null, "test");
|
||||
final Path log1 = new Path(logDir, "log.1");
|
||||
final Path log2 = new Path(logDir, "log.2");
|
||||
|
||||
WALProvider.Writer writer1 = WALFactory.createWALWriter(FS, log1, TEST_UTIL.getConfiguration());
|
||||
WALProvider.Writer writer2 = WALFactory.createWALWriter(FS, log2, TEST_UTIL.getConfiguration());
|
||||
WALProvider.Writer writer1
|
||||
= WALFactory.createWALWriter(FS, log1, TEST_UTIL.getConfiguration());
|
||||
WALProvider.Writer writer2
|
||||
= WALFactory.createWALWriter(FS, log2, TEST_UTIL.getConfiguration());
|
||||
|
||||
appendEntries(writer1, 3);
|
||||
appendEntries(writer2, 2);
|
||||
appendEntries(writer1, 3);
|
||||
appendEntries(writer2, 2);
|
||||
|
||||
long pos = getPosition(wals, log2, 2);
|
||||
long pos = getPosition(wals, log2, 2);
|
||||
|
||||
final ReplicationSource source = mocks.createReplicationSourceWithMocks(endpoint);
|
||||
source.run();
|
||||
final ReplicationSource source = mocks.createReplicationSourceWithMocks(endpoint, false);
|
||||
source.run();
|
||||
|
||||
source.enqueueLog(log1);
|
||||
// log rolled
|
||||
source.enqueueLog(log2);
|
||||
source.enqueueLog(log1);
|
||||
// log rolled
|
||||
source.enqueueLog(log2);
|
||||
|
||||
Waiter.waitFor(conf, 20000, new Waiter.Predicate<Exception>() {
|
||||
@Override public boolean evaluate() throws Exception {
|
||||
return endpoint.replicateCount.get() > 0;
|
||||
}
|
||||
});
|
||||
Waiter.waitFor(conf, 20000, new Waiter.Predicate<Exception>() {
|
||||
@Override public boolean evaluate() {
|
||||
return endpoint.replicateCount.get() > 0;
|
||||
}
|
||||
});
|
||||
|
||||
ArgumentCaptor<Path> pathCaptor = ArgumentCaptor.forClass(Path.class);
|
||||
ArgumentCaptor<Long> positionCaptor = ArgumentCaptor.forClass(Long.class);
|
||||
verify(mocks.manager, times(1))
|
||||
ArgumentCaptor<Path> pathCaptor = ArgumentCaptor.forClass(Path.class);
|
||||
ArgumentCaptor<Long> positionCaptor = ArgumentCaptor.forClass(Long.class);
|
||||
verify(mocks.manager, times(1))
|
||||
.logPositionAndCleanOldLogs(pathCaptor.capture(), anyString(), positionCaptor.capture(),
|
||||
anyBoolean(), anyBoolean());
|
||||
assertTrue(endpoint.lastEntries.size() == 5);
|
||||
assertThat(pathCaptor.getValue(), is(log2));
|
||||
assertThat(positionCaptor.getValue(), is(pos));
|
||||
anyBoolean(), anyBoolean());
|
||||
assertTrue(endpoint.lastEntries.size() == 5);
|
||||
assertThat(pathCaptor.getValue(), is(log2));
|
||||
assertThat(positionCaptor.getValue(), is(pos));
|
||||
} finally {
|
||||
conf.setInt("replication.source.nb.capacity", nbCapacity);
|
||||
}
|
||||
}
|
||||
|
||||
@Test
|
||||
|
@ -405,7 +419,7 @@ public class TestReplicationSource {
|
|||
writer.close();
|
||||
|
||||
Mocks mocks = new Mocks();
|
||||
final ReplicationSource source = mocks.createReplicationSourceWithMocks(endpoint);
|
||||
final ReplicationSource source = mocks.createReplicationSourceWithMocks(endpoint, false);
|
||||
source.run();
|
||||
|
||||
source.enqueueLog(log);
|
||||
|
@ -423,7 +437,7 @@ public class TestReplicationSource {
|
|||
Mocks mocks = new Mocks();
|
||||
|
||||
final ReplicationEndpointForTest endpoint = new ReplicationEndpointForTest();
|
||||
final ReplicationSource source = mocks.createReplicationSourceWithMocks(endpoint);
|
||||
final ReplicationSource source = mocks.createReplicationSourceWithMocks(endpoint, false);
|
||||
WALFactory wals = new WALFactory(TEST_UTIL.getConfiguration(), null, "test");
|
||||
|
||||
final Path log1 = new Path(logDir, "log.1");
|
||||
|
@ -475,7 +489,7 @@ public class TestReplicationSource {
|
|||
final long pos = getPosition(wals, log2, 2);
|
||||
|
||||
final ReplicationEndpointForTest endpoint = new ReplicationEndpointForTest();
|
||||
final ReplicationSource source = mocks.createReplicationSourceWithMocks(endpoint);
|
||||
final ReplicationSource source = mocks.createReplicationSourceWithMocks(endpoint, false);
|
||||
source.enqueueLog(log1);
|
||||
source.enqueueLog(log2);
|
||||
source.run();
|
||||
|
@ -529,7 +543,7 @@ public class TestReplicationSource {
|
|||
}
|
||||
};
|
||||
|
||||
final ReplicationSource source = mocks.createReplicationSourceWithMocks(endpoint);
|
||||
final ReplicationSource source = mocks.createReplicationSourceWithMocks(endpoint, false);
|
||||
source.run();
|
||||
source.enqueueLog(log1);
|
||||
|
||||
|
@ -556,6 +570,173 @@ public class TestReplicationSource {
|
|||
});
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testReplicationOnEmptyLogAtTheEndOfQueueWithMultipleLogs() throws Exception {
|
||||
final String logPrefix = "logPrefix";
|
||||
Mocks mocks = new Mocks();
|
||||
// set table cfs to filter all cells out
|
||||
final TableName replicatedTable = TableName.valueOf("replicated_table");
|
||||
final Map<TableName, List<String>> cfs =
|
||||
Collections.singletonMap(replicatedTable, Collections.<String>emptyList());
|
||||
when(mocks.peer.getTableCFs()).thenReturn(cfs);
|
||||
|
||||
// Append 3 entries in a log
|
||||
final Path log1 = new Path(logDir, logPrefix + ".1");
|
||||
WALProvider.Writer writer1 = WALFactory.createWALWriter(FS, log1, TEST_UTIL.getConfiguration());
|
||||
appendEntries(writer1, 3);
|
||||
|
||||
// Create a 0 length log.
|
||||
Path emptyLog = new Path(logDir, logPrefix + ".2");
|
||||
FSDataOutputStream fsdos = FS.create(emptyLog);
|
||||
fsdos.close();
|
||||
assertEquals(0, FS.getFileStatus(emptyLog).getLen());
|
||||
|
||||
// Replication end point with no filter
|
||||
final ReplicationEndpointForTest endpoint = new ReplicationEndpointForTest() {
|
||||
@Override
|
||||
public WALEntryFilter getWALEntryfilter() {
|
||||
return null;
|
||||
}
|
||||
};
|
||||
|
||||
final ReplicationSource source = mocks.createReplicationSourceWithMocks(endpoint, true);
|
||||
source.run();
|
||||
source.enqueueLog(log1);
|
||||
source.enqueueLog(emptyLog);
|
||||
|
||||
// Wait for source to replicate
|
||||
Waiter.waitFor(conf, 20000, new Waiter.Predicate<Exception>() {
|
||||
@Override public boolean evaluate() {
|
||||
return endpoint.replicateCount.get() == 1;
|
||||
}
|
||||
});
|
||||
|
||||
// Wait and verify if all the entries get replicated for non empty logs
|
||||
Waiter.waitFor(conf, 20000, new Waiter.Predicate<Exception>() {
|
||||
@Override public boolean evaluate() {
|
||||
return endpoint.lastEntries.size() == 3;
|
||||
}
|
||||
});
|
||||
|
||||
// Wait and verify if log queue has been drained fully
|
||||
Waiter.waitFor(conf, 20000, new Waiter.Predicate<Exception>() {
|
||||
@Override public boolean evaluate() {
|
||||
return source.getQueues().get(logPrefix).isEmpty();
|
||||
}
|
||||
});
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testReplicationOnEmptyLogAtTheEndOfQueueWithSingleLog() throws Exception {
|
||||
final String logPrefix = "logPrefix";
|
||||
Mocks mocks = new Mocks();
|
||||
// set table cfs to filter all cells out
|
||||
final TableName replicatedTable = TableName.valueOf("replicated_table");
|
||||
final Map<TableName, List<String>> cfs =
|
||||
Collections.singletonMap(replicatedTable, Collections.<String>emptyList());
|
||||
when(mocks.peer.getTableCFs()).thenReturn(cfs);
|
||||
|
||||
// Create a 0 length log.
|
||||
Path emptyLog = new Path(logDir, logPrefix + ".1");
|
||||
FSDataOutputStream fsdos = FS.create(emptyLog);
|
||||
fsdos.close();
|
||||
assertEquals(0, FS.getFileStatus(emptyLog).getLen());
|
||||
|
||||
// Replication end point with no filter
|
||||
final ReplicationEndpointForTest endpoint = new ReplicationEndpointForTest() {
|
||||
@Override
|
||||
public WALEntryFilter getWALEntryfilter() {
|
||||
return null;
|
||||
}
|
||||
};
|
||||
|
||||
final ReplicationSource source = mocks.createReplicationSourceWithMocks(endpoint, true);
|
||||
source.run();
|
||||
source.enqueueLog(emptyLog);
|
||||
|
||||
// Wait and verify if no entry got replicated
|
||||
Waiter.waitFor(conf, 20000, new Waiter.Predicate<Exception>() {
|
||||
@Override public boolean evaluate() {
|
||||
return endpoint.lastEntries == null;
|
||||
}
|
||||
});
|
||||
|
||||
// Wait and verify get is queue is empty
|
||||
Waiter.waitFor(conf, 20000, new Waiter.Predicate<Exception>() {
|
||||
@Override public boolean evaluate() {
|
||||
return source.getQueues().get(logPrefix).isEmpty();
|
||||
}
|
||||
});
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testReplicationOnEmptyLogBetweenTheNonEmptyLogsInLogQueue() throws Exception {
|
||||
final String logPrefix = "logPrefix";
|
||||
Mocks mocks = new Mocks();
|
||||
// set table cfs to filter all cells out
|
||||
final TableName replicatedTable = TableName.valueOf("replicated_table");
|
||||
final Map<TableName, List<String>> cfs =
|
||||
Collections.singletonMap(replicatedTable, Collections.<String>emptyList());
|
||||
when(mocks.peer.getTableCFs()).thenReturn(cfs);
|
||||
|
||||
// Append 3 entries in a log
|
||||
final Path log1 = new Path(logDir, logPrefix + ".11");
|
||||
WALProvider.Writer writer1 = WALFactory.createWALWriter(FS, log1, TEST_UTIL.getConfiguration());
|
||||
appendEntries(writer1, 3);
|
||||
|
||||
// Create a 0 length log.
|
||||
Path emptyLog = new Path(logDir, logPrefix + ".12");
|
||||
FSDataOutputStream fsdos = FS.create(emptyLog);
|
||||
fsdos.close();
|
||||
assertEquals(0, FS.getFileStatus(emptyLog).getLen());
|
||||
|
||||
// Append 5 entries in a log
|
||||
final Path log3 = new Path(logDir, logPrefix + ".13");
|
||||
WALProvider.Writer writer3 = WALFactory.createWALWriter(FS, log3, TEST_UTIL.getConfiguration());
|
||||
appendEntries(writer3, 5);
|
||||
|
||||
// Append 10 entries in a log
|
||||
final Path log4 = new Path(logDir, logPrefix + ".14");
|
||||
WALProvider.Writer writer4 = WALFactory.createWALWriter(FS, log4, TEST_UTIL.getConfiguration());
|
||||
appendEntries(writer4, 10);
|
||||
|
||||
// Replication end point with no filter
|
||||
final ReplicationEndpointForTest endpoint = new ReplicationEndpointForTest() {
|
||||
@Override
|
||||
public WALEntryFilter getWALEntryfilter() {
|
||||
return null;
|
||||
}
|
||||
};
|
||||
|
||||
final ReplicationSource source = mocks.createReplicationSourceWithMocks(endpoint, true);
|
||||
source.run();
|
||||
source.enqueueLog(log1);
|
||||
source.enqueueLog(emptyLog);
|
||||
source.enqueueLog(log3);
|
||||
source.enqueueLog(log4);
|
||||
|
||||
// Wait for source to replicate
|
||||
Waiter.waitFor(conf, 20000, new Waiter.Predicate<Exception>() {
|
||||
@Override public boolean evaluate() {
|
||||
return endpoint.replicateCount.get() == 2;
|
||||
}
|
||||
});
|
||||
|
||||
// Wait and verify the last replicated entries
|
||||
Waiter.waitFor(conf, 20000, new Waiter.Predicate<Exception>() {
|
||||
@Override public boolean evaluate() {
|
||||
return endpoint.lastEntries.size() == 15;
|
||||
}
|
||||
});
|
||||
|
||||
// Wait and verify only one log is there in queue
|
||||
Waiter.waitFor(conf, 20000, new Waiter.Predicate<Exception>() {
|
||||
@Override public boolean evaluate() {
|
||||
return source.getQueues().get(logPrefix).size() == 1;
|
||||
}
|
||||
});
|
||||
}
|
||||
|
||||
/**
|
||||
* Tests that recovered queues are preserved on a regionserver shutdown.
|
||||
* See HBASE-18192
|
||||
|
|
|
@ -76,7 +76,6 @@ public abstract class TestReplicationSourceBase {
|
|||
protected static DummyServer server;
|
||||
|
||||
@BeforeClass public static void setUpBeforeClass() throws Exception {
|
||||
|
||||
conf = HBaseConfiguration.create();
|
||||
conf.set("replication.replicationsource.implementation",
|
||||
ReplicationSourceDummyWithNoTermination.class.getCanonicalName());
|
||||
|
|
|
@ -81,6 +81,7 @@ import org.apache.hadoop.hbase.wal.WAL;
|
|||
import org.apache.hadoop.hbase.wal.WAL.Entry;
|
||||
import org.apache.hadoop.hbase.wal.WALFactory;
|
||||
import org.apache.hadoop.hbase.wal.WALKey;
|
||||
import org.apache.hadoop.hbase.wal.WALProvider;
|
||||
import org.apache.hadoop.hdfs.MiniDFSCluster;
|
||||
import org.junit.After;
|
||||
import org.junit.AfterClass;
|
||||
|
@ -741,9 +742,45 @@ public class TestWALEntryStream {
|
|||
new ReplicationSourceWALReaderThread(mockSourceManager, getRecoveredQueueInfo(),
|
||||
localLogQueue, 0, fs, conf, getDummyFilter(), getMockMetrics(), source, fakeWalGroupId);
|
||||
reader.run();
|
||||
assertEquals(0, localLogQueue.getQueueSize(fakeWalGroupId));
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testEOFExceptionForRecoveredQueueWithMultipleLogs() throws Exception {
|
||||
ReplicationSourceLogQueue localLogQueue = new ReplicationSourceLogQueue(conf, getMockMetrics());
|
||||
// Create a 0 length log.
|
||||
Path emptyLog = new Path("log.2");
|
||||
FSDataOutputStream fsdos = fs.create(emptyLog);
|
||||
fsdos.close();
|
||||
assertEquals(0, fs.getFileStatus(emptyLog).getLen());
|
||||
localLogQueue.enqueueLog(emptyLog, fakeWalGroupId);
|
||||
|
||||
final Path log1 = new Path("log.1");
|
||||
WALProvider.Writer writer1 = WALFactory.createWALWriter(fs, log1, TEST_UTIL.getConfiguration());
|
||||
appendEntries(writer1, 3);
|
||||
localLogQueue.enqueueLog(log1, fakeWalGroupId);
|
||||
|
||||
ReplicationSource source = Mockito.mock(ReplicationSource.class);
|
||||
ReplicationSourceManager mockSourceManager = mock(ReplicationSourceManager.class);
|
||||
// Make it look like the source is from recovered source.
|
||||
when(mockSourceManager.getOldSources())
|
||||
.thenReturn(new ArrayList<>(Arrays.asList((ReplicationSourceInterface)source)));
|
||||
when(source.isPeerEnabled()).thenReturn(true);
|
||||
when(mockSourceManager.getTotalBufferUsed()).thenReturn(new AtomicLong(0));
|
||||
// Override the max retries multiplier to fail fast.
|
||||
conf.setInt("replication.source.maxretriesmultiplier", 1);
|
||||
conf.setBoolean("replication.source.eof.autorecovery", true);
|
||||
// Create a reader thread.
|
||||
ReplicationSourceWALReaderThread reader =
|
||||
new ReplicationSourceWALReaderThread(mockSourceManager, getRecoveredQueueInfo(),
|
||||
localLogQueue, 0, fs, conf, getDummyFilter(), getMockMetrics(), source, fakeWalGroupId);
|
||||
assertEquals("Initial log queue size is not correct",
|
||||
2, localLogQueue.getQueueSize(fakeWalGroupId));
|
||||
reader.run();
|
||||
|
||||
// ReplicationSourceWALReaderThread#handleEofException method will
|
||||
// remove empty log from logQueue.
|
||||
assertEquals(0, localLogQueue.getQueueSize(fakeWalGroupId));
|
||||
assertEquals("Log queue should be empty", 0, localLogQueue.getQueueSize(fakeWalGroupId));
|
||||
}
|
||||
|
||||
private PriorityBlockingQueue<Path> getQueue() {
|
||||
|
@ -757,4 +794,21 @@ public class TestWALEntryStream {
|
|||
doNothing().when(source).setOldestWalAge(Mockito.anyInt());
|
||||
return source;
|
||||
}
|
||||
|
||||
private void appendEntries(WALProvider.Writer writer, int numEntries) throws IOException {
|
||||
for (int i = 0; i < numEntries; i++) {
|
||||
byte[] b = Bytes.toBytes(Integer.toString(i));
|
||||
KeyValue kv = new KeyValue(b,b,b);
|
||||
WALEdit edit = new WALEdit();
|
||||
edit.add(kv);
|
||||
WALKey key = new WALKey(b, TableName.valueOf(b), 0, 0,
|
||||
HConstants.DEFAULT_CLUSTER_ID);
|
||||
NavigableMap<byte[], Integer> scopes = new TreeMap<byte[], Integer>(Bytes.BYTES_COMPARATOR);
|
||||
scopes.put(b, HConstants.REPLICATION_SCOPE_GLOBAL);
|
||||
key.setScopes(scopes);
|
||||
writer.append(new WAL.Entry(key, edit));
|
||||
writer.sync(false);
|
||||
}
|
||||
writer.close();
|
||||
}
|
||||
}
|
||||
|
|
Loading…
Reference in New Issue