HBASE-15995 Separate replication WAL reading from shipping

Signed-off-by: tedyu <yuzhihong@gmail.com>
This commit is contained in:
Vincent 2017-02-06 11:06:50 -08:00 committed by tedyu
parent 8088aa3733
commit c55fce00f3
7 changed files with 1606 additions and 996 deletions

View File

@ -0,0 +1,70 @@
/**
*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hbase.replication;
import java.util.UUID;
import org.apache.hadoop.hbase.HBaseInterfaceAudience;
import org.apache.hadoop.hbase.classification.InterfaceAudience;
import org.apache.hadoop.hbase.classification.InterfaceStability;
import org.apache.hadoop.hbase.regionserver.wal.WALEdit;
import org.apache.hadoop.hbase.wal.WALKey;
import org.apache.hadoop.hbase.wal.WAL.Entry;
/**
* Filters out entries with our peerClusterId (i.e. already replicated)
* and marks all other entries with our clusterID
*/
@InterfaceAudience.LimitedPrivate(HBaseInterfaceAudience.REPLICATION)
@InterfaceStability.Evolving
public class ClusterMarkingEntryFilter implements WALEntryFilter {
private UUID clusterId;
private UUID peerClusterId;
private ReplicationEndpoint replicationEndpoint;
/**
* @param clusterId id of this cluster
* @param peerClusterId of the other cluster
* @param replicationEndpoint ReplicationEndpoint which will handle the actual replication
*/
public ClusterMarkingEntryFilter(UUID clusterId, UUID peerClusterId, ReplicationEndpoint replicationEndpoint) {
this.clusterId = clusterId;
this.peerClusterId = peerClusterId;
this.replicationEndpoint = replicationEndpoint;
}
@Override
public Entry filter(Entry entry) {
// don't replicate if the log entries have already been consumed by the cluster
if (replicationEndpoint.canReplicateToSameCluster()
|| !entry.getKey().getClusterIds().contains(peerClusterId)) {
WALEdit edit = entry.getEdit();
WALKey logKey = entry.getKey();
if (edit != null && !edit.isEmpty()) {
// Mark that the current cluster has the change
logKey.addClusterId(clusterId);
// We need to set the CC to null else it will be compressed when sent to the sink
entry.setCompressionContext(null);
return entry;
}
}
return null;
}
}

View File

@ -0,0 +1,471 @@
/**
*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hbase.replication.regionserver;
import java.io.IOException;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.PriorityBlockingQueue;
import java.util.concurrent.atomic.AtomicLong;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hbase.Cell;
import org.apache.hadoop.hbase.CellUtil;
import org.apache.hadoop.hbase.HConstants;
import org.apache.hadoop.hbase.classification.InterfaceAudience;
import org.apache.hadoop.hbase.classification.InterfaceStability;
import org.apache.hadoop.hbase.shaded.protobuf.generated.WALProtos;
import org.apache.hadoop.hbase.shaded.protobuf.generated.WALProtos.BulkLoadDescriptor;
import org.apache.hadoop.hbase.shaded.protobuf.generated.WALProtos.StoreDescriptor;
import org.apache.hadoop.hbase.regionserver.wal.WALEdit;
import org.apache.hadoop.hbase.replication.ReplicationQueueInfo;
import org.apache.hadoop.hbase.replication.WALEntryFilter;
import org.apache.hadoop.hbase.replication.regionserver.WALEntryStream.WALEntryStreamRuntimeException;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.hbase.util.Pair;
import org.apache.hadoop.hbase.util.Threads;
import org.apache.hadoop.hbase.wal.WAL.Entry;
/**
* Reads and filters WAL entries, groups the filtered entries into batches, and puts the batches onto a queue
*
*/
@InterfaceAudience.Private
@InterfaceStability.Evolving
public class ReplicationSourceWALReaderThread extends Thread {
private static final Log LOG = LogFactory.getLog(ReplicationSourceWALReaderThread.class);
private PriorityBlockingQueue<Path> logQueue;
private FileSystem fs;
private Configuration conf;
private BlockingQueue<WALEntryBatch> entryBatchQueue;
// max (heap) size of each batch - multiply by number of batches in queue to get total
private long replicationBatchSizeCapacity;
// max count of each batch - multiply by number of batches in queue to get total
private int replicationBatchCountCapacity;
// position in the WAL to start reading at
private long currentPosition;
private WALEntryFilter filter;
private long sleepForRetries;
//Indicates whether this particular worker is running
private boolean isReaderRunning = true;
private ReplicationQueueInfo replicationQueueInfo;
private int maxRetriesMultiplier;
private MetricsSource metrics;
private AtomicLong totalBufferUsed;
private long totalBufferQuota;
/**
* Creates a reader worker for a given WAL queue. Reads WAL entries off a given queue, batches the
* entries, and puts them on a batch queue.
* @param manager replication manager
* @param replicationQueueInfo
* @param logQueue The WAL queue to read off of
* @param startPosition position in the first WAL to start reading from
* @param fs the files system to use
* @param conf configuration to use
* @param filter The filter to use while reading
* @param metrics replication metrics
*/
public ReplicationSourceWALReaderThread(ReplicationSourceManager manager,
ReplicationQueueInfo replicationQueueInfo, PriorityBlockingQueue<Path> logQueue,
long startPosition,
FileSystem fs, Configuration conf, WALEntryFilter filter, MetricsSource metrics) {
this.replicationQueueInfo = replicationQueueInfo;
this.logQueue = logQueue;
this.currentPosition = startPosition;
this.fs = fs;
this.conf = conf;
this.filter = filter;
this.replicationBatchSizeCapacity =
this.conf.getLong("replication.source.size.capacity", 1024 * 1024 * 64);
this.replicationBatchCountCapacity = this.conf.getInt("replication.source.nb.capacity", 25000);
// memory used will be batchSizeCapacity * (nb.batches + 1)
// the +1 is for the current thread reading before placing onto the queue
int batchCount = conf.getInt("replication.source.nb.batches", 1);
this.totalBufferUsed = manager.getTotalBufferUsed();
this.totalBufferQuota = conf.getLong(HConstants.REPLICATION_SOURCE_TOTAL_BUFFER_KEY,
HConstants.REPLICATION_SOURCE_TOTAL_BUFFER_DFAULT);
this.sleepForRetries =
this.conf.getLong("replication.source.sleepforretries", 1000); // 1 second
this.maxRetriesMultiplier =
this.conf.getInt("replication.source.maxretriesmultiplier", 300); // 5 minutes @ 1 sec per
this.metrics = metrics;
this.entryBatchQueue = new LinkedBlockingQueue<>(batchCount);
LOG.info("peerClusterZnode=" + replicationQueueInfo.getPeerClusterZnode()
+ ", ReplicationSourceWALReaderThread : " + replicationQueueInfo.getPeerId()
+ " inited, replicationBatchSizeCapacity=" + replicationBatchSizeCapacity
+ ", replicationBatchCountCapacity=" + replicationBatchCountCapacity
+ ", replicationBatchQueueCapacity=" + batchCount);
}
@Override
public void run() {
int sleepMultiplier = 1;
while (isReaderRunning()) { // we only loop back here if something fatal happened to our stream
try (WALEntryStream entryStream =
new WALEntryStream(logQueue, fs, conf, currentPosition, metrics)) {
while (isReaderRunning()) { // loop here to keep reusing stream while we can
if (!checkQuota()) {
continue;
}
WALEntryBatch batch = null;
while (entryStream.hasNext()) {
if (batch == null) {
batch = new WALEntryBatch(replicationBatchCountCapacity, entryStream.getCurrentPath());
}
Entry entry = entryStream.next();
if (updateSerialReplPos(batch, entry)) {
batch.lastWalPosition = entryStream.getPosition();
break;
}
entry = filterEntry(entry);
if (entry != null) {
WALEdit edit = entry.getEdit();
if (edit != null && !edit.isEmpty()) {
long entrySize = getEntrySize(entry);
batch.addEntry(entry);
updateBatchStats(batch, entry, entryStream.getPosition(), entrySize);
boolean totalBufferTooLarge = acquireBufferQuota(entrySize);
// Stop if too many entries or too big
if (totalBufferTooLarge || batch.getHeapSize() >= replicationBatchSizeCapacity
|| batch.getNbEntries() >= replicationBatchCountCapacity) {
break;
}
}
}
}
if (batch != null && (!batch.getLastSeqIds().isEmpty() || batch.getNbEntries() > 0)) {
if (LOG.isTraceEnabled()) {
LOG.trace(String.format("Read %s WAL entries eligible for replication",
batch.getNbEntries()));
}
entryBatchQueue.put(batch);
sleepMultiplier = 1;
} else { // got no entries and didn't advance position in WAL
LOG.trace("Didn't read any new entries from WAL");
if (replicationQueueInfo.isQueueRecovered()) {
// we're done with queue recovery, shut ourself down
setReaderRunning(false);
// shuts down shipper thread immediately
entryBatchQueue.put(batch != null ? batch
: new WALEntryBatch(replicationBatchCountCapacity, entryStream.getCurrentPath()));
} else {
Thread.sleep(sleepForRetries);
}
}
currentPosition = entryStream.getPosition();
entryStream.reset(); // reuse stream
}
} catch (IOException | WALEntryStreamRuntimeException e) { // stream related
if (sleepMultiplier < maxRetriesMultiplier) {
LOG.debug("Failed to read stream of replication entries: " + e);
sleepMultiplier++;
} else {
LOG.error("Failed to read stream of replication entries", e);
}
Threads.sleep(sleepForRetries * sleepMultiplier);
} catch (InterruptedException e) {
LOG.trace("Interrupted while sleeping between WAL reads");
Thread.currentThread().interrupt();
}
}
}
//returns false if we've already exceeded the global quota
private boolean checkQuota() {
// try not to go over total quota
if (totalBufferUsed.get() > totalBufferQuota) {
Threads.sleep(sleepForRetries);
return false;
}
return true;
}
private Entry filterEntry(Entry entry) {
Entry filtered = filter.filter(entry);
if (entry != null && filtered == null) {
metrics.incrLogEditsFiltered();
}
return filtered;
}
/**
* @return true if we should stop reading because we're at REGION_CLOSE
*/
private boolean updateSerialReplPos(WALEntryBatch batch, Entry entry) throws IOException {
if (entry.hasSerialReplicationScope()) {
String key = Bytes.toString(entry.getKey().getEncodedRegionName());
batch.setLastPosition(key, entry.getKey().getSequenceId());
if (!entry.getEdit().getCells().isEmpty()) {
WALProtos.RegionEventDescriptor maybeEvent =
WALEdit.getRegionEventDescriptor(entry.getEdit().getCells().get(0));
if (maybeEvent != null && maybeEvent
.getEventType() == WALProtos.RegionEventDescriptor.EventType.REGION_CLOSE) {
// In serially replication, if we move a region to another RS and move it back, we may
// read logs crossing two sections. We should break at REGION_CLOSE and push the first
// section first in case of missing the middle section belonging to the other RS.
// In a worker thread, if we can push the first log of a region, we can push all logs
// in the same region without waiting until we read a close marker because next time
// we read logs in this region, it must be a new section and not adjacent with this
// region. Mark it negative.
batch.setLastPosition(key, -entry.getKey().getSequenceId());
return true;
}
}
}
return false;
}
/**
* Retrieves the next batch of WAL entries from the queue, waiting up to the specified time for a
* batch to become available
* @return A batch of entries, along with the position in the log after reading the batch
* @throws InterruptedException if interrupted while waiting
*/
public WALEntryBatch take() throws InterruptedException {
return entryBatchQueue.take();
}
private long getEntrySize(Entry entry) {
WALEdit edit = entry.getEdit();
return edit.heapSize() + calculateTotalSizeOfStoreFiles(edit);
}
private void updateBatchStats(WALEntryBatch batch, Entry entry, long entryPosition, long entrySize) {
WALEdit edit = entry.getEdit();
if (edit != null && !edit.isEmpty()) {
batch.incrementHeapSize(entrySize);
Pair<Integer, Integer> nbRowsAndHFiles = countDistinctRowKeysAndHFiles(edit);
batch.incrementNbRowKeys(nbRowsAndHFiles.getFirst());
batch.incrementNbHFiles(nbRowsAndHFiles.getSecond());
}
batch.lastWalPosition = entryPosition;
}
/**
* Count the number of different row keys in the given edit because of mini-batching. We assume
* that there's at least one Cell in the WALEdit.
* @param edit edit to count row keys from
* @return number of different row keys and HFiles
*/
private Pair<Integer, Integer> countDistinctRowKeysAndHFiles(WALEdit edit) {
List<Cell> cells = edit.getCells();
int distinctRowKeys = 1;
int totalHFileEntries = 0;
Cell lastCell = cells.get(0);
int totalCells = edit.size();
for (int i = 0; i < totalCells; i++) {
// Count HFiles to be replicated
if (CellUtil.matchingQualifier(cells.get(i), WALEdit.BULK_LOAD)) {
try {
BulkLoadDescriptor bld = WALEdit.getBulkLoadDescriptor(cells.get(i));
List<StoreDescriptor> stores = bld.getStoresList();
int totalStores = stores.size();
for (int j = 0; j < totalStores; j++) {
totalHFileEntries += stores.get(j).getStoreFileList().size();
}
} catch (IOException e) {
LOG.error("Failed to deserialize bulk load entry from wal edit. "
+ "Then its hfiles count will not be added into metric.");
}
}
if (!CellUtil.matchingRows(cells.get(i), lastCell)) {
distinctRowKeys++;
}
lastCell = cells.get(i);
}
Pair<Integer, Integer> result = new Pair<>(distinctRowKeys, totalHFileEntries);
return result;
}
/**
* Calculate the total size of all the store files
* @param edit edit to count row keys from
* @return the total size of the store files
*/
private int calculateTotalSizeOfStoreFiles(WALEdit edit) {
List<Cell> cells = edit.getCells();
int totalStoreFilesSize = 0;
int totalCells = edit.size();
for (int i = 0; i < totalCells; i++) {
if (CellUtil.matchingQualifier(cells.get(i), WALEdit.BULK_LOAD)) {
try {
BulkLoadDescriptor bld = WALEdit.getBulkLoadDescriptor(cells.get(i));
List<StoreDescriptor> stores = bld.getStoresList();
int totalStores = stores.size();
for (int j = 0; j < totalStores; j++) {
totalStoreFilesSize += stores.get(j).getStoreFileSizeBytes();
}
} catch (IOException e) {
LOG.error("Failed to deserialize bulk load entry from wal edit. "
+ "Size of HFiles part of cell will not be considered in replication "
+ "request size calculation.",
e);
}
}
}
return totalStoreFilesSize;
}
/**
* @param size delta size for grown buffer
* @return true if we should clear buffer and push all
*/
private boolean acquireBufferQuota(long size) {
return totalBufferUsed.addAndGet(size) >= totalBufferQuota;
}
/**
* @return whether the reader thread is running
*/
public boolean isReaderRunning() {
return isReaderRunning && !isInterrupted();
}
/**
* @param readerRunning the readerRunning to set
*/
public void setReaderRunning(boolean readerRunning) {
this.isReaderRunning = readerRunning;
}
/**
* Holds a batch of WAL entries to replicate, along with some statistics
*
*/
static class WALEntryBatch {
private List<Entry> walEntries;
// last WAL that was read
private Path lastWalPath;
// position in WAL of last entry in this batch
private long lastWalPosition = 0;
// number of distinct row keys in this batch
private int nbRowKeys = 0;
// number of HFiles
private int nbHFiles = 0;
// heap size of data we need to replicate
private long heapSize = 0;
// save the last sequenceid for each region if the table has serial-replication scope
private Map<String, Long> lastSeqIds = new HashMap<>();
/**
* @param walEntries
* @param lastWalPath Path of the WAL the last entry in this batch was read from
* @param lastWalPosition Position in the WAL the last entry in this batch was read from
*/
private WALEntryBatch(int maxNbEntries, Path lastWalPath) {
this.walEntries = new ArrayList<>(maxNbEntries);
this.lastWalPath = lastWalPath;
}
public void addEntry(Entry entry) {
walEntries.add(entry);
}
/**
* @return the WAL Entries.
*/
public List<Entry> getWalEntries() {
return walEntries;
}
/**
* @return the path of the last WAL that was read.
*/
public Path getLastWalPath() {
return lastWalPath;
}
/**
* @return the position in the last WAL that was read.
*/
public long getLastWalPosition() {
return lastWalPosition;
}
public int getNbEntries() {
return walEntries.size();
}
/**
* @return the number of distinct row keys in this batch
*/
public int getNbRowKeys() {
return nbRowKeys;
}
/**
* @return the number of HFiles in this batch
*/
public int getNbHFiles() {
return nbHFiles;
}
/**
* @return total number of operations in this batch
*/
public int getNbOperations() {
return getNbRowKeys() + getNbHFiles();
}
/**
* @return the heap size of this batch
*/
public long getHeapSize() {
return heapSize;
}
/**
* @return the last sequenceid for each region if the table has serial-replication scope
*/
public Map<String, Long> getLastSeqIds() {
return lastSeqIds;
}
private void incrementNbRowKeys(int increment) {
nbRowKeys += increment;
}
private void incrementNbHFiles(int increment) {
nbHFiles += increment;
}
private void incrementHeapSize(long increment) {
heapSize += increment;
}
private void setLastPosition(String region, Long sequenceId) {
getLastSeqIds().put(region, sequenceId);
}
}
}

View File

@ -1,155 +0,0 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hbase.replication.regionserver;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.hbase.classification.InterfaceAudience;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hbase.regionserver.wal.ProtobufLogReader;
import org.apache.hadoop.hbase.wal.WAL.Reader;
import org.apache.hadoop.hbase.wal.WAL.Entry;
import org.apache.hadoop.hbase.wal.WALFactory;
import java.io.IOException;
/**
* Wrapper class around WAL to help manage the implementation details
* such as compression.
*/
@InterfaceAudience.Private
public class ReplicationWALReaderManager {
private static final Log LOG = LogFactory.getLog(ReplicationWALReaderManager.class);
private final FileSystem fs;
private final Configuration conf;
private long position = 0;
private Reader reader;
private Path lastPath;
/**
* Creates the helper but doesn't open any file
* Use setInitialPosition after using the constructor if some content needs to be skipped
* @param fs
* @param conf
*/
public ReplicationWALReaderManager(FileSystem fs, Configuration conf) {
this.fs = fs;
this.conf = conf;
}
/**
* Opens the file at the current position
* @param path
* @return an WAL reader.
* @throws IOException
*/
public Reader openReader(Path path) throws IOException {
// Detect if this is a new file, if so get a new reader else
// reset the current reader so that we see the new data
if (this.reader == null || !this.lastPath.equals(path)) {
this.closeReader();
this.reader = WALFactory.createReader(this.fs, path, this.conf);
this.lastPath = path;
} else {
try {
this.reader.reset();
} catch (NullPointerException npe) {
throw new IOException("NPE resetting reader, likely HDFS-4380", npe);
}
}
return this.reader;
}
/**
* Get the next entry, returned and also added in the array
* @return a new entry or null
* @throws IOException
*/
public Entry readNextAndSetPosition() throws IOException {
Entry entry = this.reader.next();
// Store the position so that in the future the reader can start
// reading from here. If the above call to next() throws an
// exception, the position won't be changed and retry will happen
// from the last known good position
this.position = this.reader.getPosition();
// We need to set the CC to null else it will be compressed when sent to the sink
if (entry != null) {
entry.setCompressionContext(null);
}
return entry;
}
/**
* Advance the reader to the current position
* @throws IOException
*/
public void seek() throws IOException {
if (this.position != 0) {
this.reader.seek(this.position);
}
}
/**
* Get the position that we stopped reading at
* @return current position, cannot be negative
*/
public long getPosition() {
return this.position;
}
public void setPosition(long pos) {
this.position = pos;
}
public long currentTrailerSize() {
long size = -1L;
if (reader instanceof ProtobufLogReader) {
final ProtobufLogReader pblr = (ProtobufLogReader)reader;
size = pblr.trailerSize();
}
return size;
}
/**
* Close the current reader
* @throws IOException
*/
public void closeReader() throws IOException {
if (this.reader != null) {
this.reader.close();
this.reader = null;
}
}
/**
* Tell the helper to reset internal state
*/
void finishCurrentFile() {
this.position = 0;
try {
this.closeReader();
} catch (IOException e) {
LOG.warn("Unable to close reader", e);
}
}
}

View File

@ -0,0 +1,411 @@
/**
*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hbase.replication.regionserver;
import java.io.Closeable;
import java.io.FileNotFoundException;
import java.io.IOException;
import java.util.Iterator;
import java.util.NoSuchElementException;
import java.util.concurrent.PriorityBlockingQueue;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hbase.HConstants;
import org.apache.hadoop.hbase.classification.InterfaceAudience;
import org.apache.hadoop.hbase.classification.InterfaceStability;
import org.apache.hadoop.hbase.regionserver.wal.ProtobufLogReader;
import org.apache.hadoop.hbase.util.CancelableProgressable;
import org.apache.hadoop.hbase.util.FSUtils;
import org.apache.hadoop.hbase.util.LeaseNotRecoveredException;
import org.apache.hadoop.hbase.wal.WAL.Entry;
import org.apache.hadoop.hbase.wal.WAL.Reader;
import org.apache.hadoop.hbase.wal.WALFactory;
/**
* Streaming access to WAL entries. This class is given a queue of WAL {@link Path}, and continually
* iterates through all the WAL {@link Entry} in the queue. When it's done reading from a Path, it
* dequeues it and starts reading from the next.
*/
@InterfaceAudience.Private
@InterfaceStability.Evolving
public class WALEntryStream implements Iterator<Entry>, Closeable, Iterable<Entry> {
private static final Log LOG = LogFactory.getLog(WALEntryStream.class);
private Reader reader;
private Path currentPath;
// cache of next entry for hasNext()
private Entry currentEntry;
// position after reading current entry
private long currentPosition = 0;
private PriorityBlockingQueue<Path> logQueue;
private FileSystem fs;
private Configuration conf;
private MetricsSource metrics;
/**
* Create an entry stream over the given queue
* @param logQueue the queue of WAL paths
* @param fs {@link FileSystem} to use to create {@link Reader} for this stream
* @param conf {@link Configuration} to use to create {@link Reader} for this stream
* @param metrics replication metrics
* @throws IOException
*/
public WALEntryStream(PriorityBlockingQueue<Path> logQueue, FileSystem fs, Configuration conf,
MetricsSource metrics)
throws IOException {
this(logQueue, fs, conf, 0, metrics);
}
/**
* Create an entry stream over the given queue at the given start position
* @param logQueue the queue of WAL paths
* @param fs {@link FileSystem} to use to create {@link Reader} for this stream
* @param conf {@link Configuration} to use to create {@link Reader} for this stream
* @param startPosition the position in the first WAL to start reading at
* @param metrics replication metrics
* @throws IOException
*/
public WALEntryStream(PriorityBlockingQueue<Path> logQueue, FileSystem fs, Configuration conf,
long startPosition, MetricsSource metrics) throws IOException {
this.logQueue = logQueue;
this.fs = fs;
this.conf = conf;
this.currentPosition = startPosition;
this.metrics = metrics;
}
/**
* @return true if there is another WAL {@link Entry}
* @throws WALEntryStreamRuntimeException if there was an Exception while reading
*/
@Override
public boolean hasNext() {
if (currentEntry == null) {
try {
tryAdvanceEntry();
} catch (Exception e) {
throw new WALEntryStreamRuntimeException(e);
}
}
return currentEntry != null;
}
/**
* @return the next WAL entry in this stream
* @throws WALEntryStreamRuntimeException if there was an IOException
* @throws NoSuchElementException if no more entries in the stream.
*/
@Override
public Entry next() {
if (!hasNext()) throw new NoSuchElementException();
Entry save = currentEntry;
currentEntry = null; // gets reloaded by hasNext()
return save;
}
/**
* Not supported.
*/
@Override
public void remove() {
throw new UnsupportedOperationException();
}
/**
* {@inheritDoc}
*/
@Override
public void close() throws IOException {
closeReader();
}
/**
* @return the iterator over WAL entries in the queue.
*/
@Override
public Iterator<Entry> iterator() {
return this;
}
/**
* @return the position of the last Entry returned by next()
*/
public long getPosition() {
return currentPosition;
}
/**
* @return the {@link Path} of the current WAL
*/
public Path getCurrentPath() {
return currentPath;
}
private String getCurrentPathStat() {
StringBuilder sb = new StringBuilder();
if (currentPath != null) {
sb.append("currently replicating from: ").append(currentPath).append(" at position: ")
.append(currentPosition).append("\n");
} else {
sb.append("no replication ongoing, waiting for new log");
}
return sb.toString();
}
/**
* Should be called if the stream is to be reused (i.e. used again after hasNext() has returned
* false)
* @throws IOException
*/
public void reset() throws IOException {
if (reader != null && currentPath != null) {
resetReader();
}
}
private void setPosition(long position) {
currentPosition = position;
}
private void setCurrentPath(Path path) {
this.currentPath = path;
}
private void tryAdvanceEntry() throws IOException {
if (checkReader()) {
readNextEntryAndSetPosition();
if (currentEntry == null) { // no more entries in this log file - see if log was rolled
if (logQueue.size() > 1) { // log was rolled
// Before dequeueing, we should always get one more attempt at reading.
// This is in case more entries came in after we opened the reader,
// and a new log was enqueued while we were reading. See HBASE-6758
resetReader();
readNextEntryAndSetPosition();
if (currentEntry == null) {
if (checkAllBytesParsed()) { // now we're certain we're done with this log file
dequeueCurrentLog();
if (openNextLog()) {
readNextEntryAndSetPosition();
}
}
}
} // no other logs, we've simply hit the end of the current open log. Do nothing
}
}
// do nothing if we don't have a WAL Reader (e.g. if there's no logs in queue)
}
// HBASE-15984 check to see we have in fact parsed all data in a cleanly closed file
private boolean checkAllBytesParsed() throws IOException {
// -1 means the wal wasn't closed cleanly.
final long trailerSize = currentTrailerSize();
FileStatus stat = null;
try {
stat = fs.getFileStatus(this.currentPath);
} catch (IOException exception) {
LOG.warn("Couldn't get file length information about log " + this.currentPath + ", it "
+ (trailerSize < 0 ? "was not" : "was") + " closed cleanly " + getCurrentPathStat());
metrics.incrUnknownFileLengthForClosedWAL();
}
if (stat != null) {
if (trailerSize < 0) {
if (currentPosition < stat.getLen()) {
final long skippedBytes = stat.getLen() - currentPosition;
LOG.info("Reached the end of WAL file '" + currentPath
+ "'. It was not closed cleanly, so we did not parse " + skippedBytes
+ " bytes of data.");
metrics.incrUncleanlyClosedWALs();
metrics.incrBytesSkippedInUncleanlyClosedWALs(skippedBytes);
}
} else if (currentPosition + trailerSize < stat.getLen()) {
LOG.warn("Processing end of WAL file '" + currentPath + "'. At position " + currentPosition
+ ", which is too far away from reported file length " + stat.getLen()
+ ". Restarting WAL reading (see HBASE-15983 for details). " + getCurrentPathStat());
setPosition(0);
resetReader();
metrics.incrRestartedWALReading();
metrics.incrRepeatedFileBytes(currentPosition);
return false;
}
}
if (LOG.isTraceEnabled()) {
LOG.trace("Reached the end of log " + this.currentPath + ", and the length of the file is "
+ (stat == null ? "N/A" : stat.getLen()));
}
metrics.incrCompletedWAL();
return true;
}
private void dequeueCurrentLog() throws IOException {
if (LOG.isDebugEnabled()) {
LOG.debug("Reached the end of log " + currentPath);
}
closeReader();
logQueue.remove();
setPosition(0);
metrics.decrSizeOfLogQueue();
}
private void readNextEntryAndSetPosition() throws IOException {
Entry readEntry = reader.next();
long readerPos = reader.getPosition();
if (readEntry != null) {
metrics.incrLogEditsRead();
metrics.incrLogReadInBytes(readerPos - currentPosition);
}
currentEntry = readEntry; // could be null
setPosition(readerPos);
}
private void closeReader() throws IOException {
if (reader != null) {
reader.close();
reader = null;
}
}
// if we don't have a reader, open a reader on the next log
private boolean checkReader() throws IOException {
if (reader == null) {
return openNextLog();
}
return true;
}
// open a reader on the next log in queue
private boolean openNextLog() throws IOException {
Path nextPath = logQueue.peek();
if (nextPath != null) {
openReader(nextPath);
if (reader != null) return true;
}
return false;
}
private Path getArchivedLog(Path path) throws IOException {
Path rootDir = FSUtils.getRootDir(conf);
Path oldLogDir = new Path(rootDir, HConstants.HREGION_OLDLOGDIR_NAME);
Path archivedLogLocation = new Path(oldLogDir, path.getName());
if (fs.exists(archivedLogLocation)) {
LOG.info("Log " + path + " was moved to " + archivedLogLocation);
return archivedLogLocation;
} else {
LOG.error("Couldn't locate log: " + path);
return path;
}
}
private void openReader(Path path) throws IOException {
try {
// Detect if this is a new file, if so get a new reader else
// reset the current reader so that we see the new data
if (reader == null || !getCurrentPath().equals(path)) {
closeReader();
reader = WALFactory.createReader(fs, path, conf);
seek();
setCurrentPath(path);
} else {
resetReader();
}
} catch (FileNotFoundException fnfe) {
// If the log was archived, continue reading from there
Path archivedLog = getArchivedLog(path);
if (!path.equals(archivedLog)) {
openReader(archivedLog);
} else {
throw fnfe;
}
} catch (LeaseNotRecoveredException lnre) {
// HBASE-15019 the WAL was not closed due to some hiccup.
LOG.warn("Try to recover the WAL lease " + currentPath, lnre);
recoverLease(conf, currentPath);
reader = null;
} catch (NullPointerException npe) {
// Workaround for race condition in HDFS-4380
// which throws a NPE if we open a file before any data node has the most recent block
// Just sleep and retry. Will require re-reading compressed WALs for compressionContext.
LOG.warn("Got NPE opening reader, will retry.");
reader = null;
}
}
// For HBASE-15019
private void recoverLease(final Configuration conf, final Path path) {
try {
final FileSystem dfs = FSUtils.getCurrentFileSystem(conf);
FSUtils fsUtils = FSUtils.getInstance(dfs, conf);
fsUtils.recoverFileLease(dfs, path, conf, new CancelableProgressable() {
@Override
public boolean progress() {
LOG.debug("recover WAL lease: " + path);
return true;
}
});
} catch (IOException e) {
LOG.warn("unable to recover lease for WAL: " + path, e);
}
}
private void resetReader() throws IOException {
try {
reader.reset();
seek();
} catch (FileNotFoundException fnfe) {
// If the log was archived, continue reading from there
Path archivedLog = getArchivedLog(currentPath);
if (!currentPath.equals(archivedLog)) {
openReader(archivedLog);
} else {
throw fnfe;
}
} catch (NullPointerException npe) {
throw new IOException("NPE resetting reader, likely HDFS-4380", npe);
}
}
private void seek() throws IOException {
if (currentPosition != 0) {
reader.seek(currentPosition);
}
}
private long currentTrailerSize() {
long size = -1L;
if (reader instanceof ProtobufLogReader) {
final ProtobufLogReader pblr = (ProtobufLogReader) reader;
size = pblr.trailerSize();
}
return size;
}
@InterfaceAudience.Private
public static class WALEntryStreamRuntimeException extends RuntimeException {
private static final long serialVersionUID = -6298201811259982568L;
public WALEntryStreamRuntimeException(Exception e) {
super(e);
}
}
}

View File

@ -1,238 +0,0 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hbase.replication.regionserver;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertNotEquals;
import static org.junit.Assert.assertNotNull;
import static org.junit.Assert.assertNull;
import static org.junit.Assert.fail;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Collection;
import java.util.concurrent.atomic.AtomicLong;
import java.util.List;
import java.util.NavigableMap;
import java.util.TreeMap;
import java.util.concurrent.atomic.AtomicLong;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hbase.HBaseTestingUtility;
import org.apache.hadoop.hbase.HConstants;
import org.apache.hadoop.hbase.HRegionInfo;
import org.apache.hadoop.hbase.HTableDescriptor;
import org.apache.hadoop.hbase.KeyValue;
import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.regionserver.MultiVersionConcurrencyControl;
import org.apache.hadoop.hbase.regionserver.wal.WALActionsListener;
import org.apache.hadoop.hbase.regionserver.wal.WALEdit;
import org.apache.hadoop.hbase.testclassification.LargeTests;
import org.apache.hadoop.hbase.testclassification.ReplicationTests;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.hbase.wal.WAL;
import org.apache.hadoop.hbase.wal.WALFactory;
import org.apache.hadoop.hbase.wal.WALKey;
import org.apache.hadoop.hdfs.MiniDFSCluster;
import org.junit.After;
import org.junit.AfterClass;
import org.junit.Before;
import org.junit.BeforeClass;
import org.junit.Rule;
import org.junit.Test;
import org.junit.experimental.categories.Category;
import org.junit.rules.TestName;
import org.junit.runner.RunWith;
import org.junit.runners.Parameterized;
import org.junit.runners.Parameterized.Parameters;
@Category({ReplicationTests.class, LargeTests.class})
@RunWith(Parameterized.class)
public class TestReplicationWALReaderManager {
private static HBaseTestingUtility TEST_UTIL;
private static Configuration conf;
private static FileSystem fs;
private static MiniDFSCluster cluster;
private static final TableName tableName = TableName.valueOf("tablename");
private static final byte [] family = Bytes.toBytes("column");
private static final byte [] qualifier = Bytes.toBytes("qualifier");
private static final HRegionInfo info = new HRegionInfo(tableName,
HConstants.EMPTY_START_ROW, HConstants.LAST_ROW, false);
private static final HTableDescriptor htd = new HTableDescriptor(tableName);
private static NavigableMap<byte[], Integer> scopes;
private WAL log;
private ReplicationWALReaderManager logManager;
private PathWatcher pathWatcher;
private int nbRows;
private int walEditKVs;
private final AtomicLong sequenceId = new AtomicLong(1);
@Rule public TestName tn = new TestName();
private final MultiVersionConcurrencyControl mvcc = new MultiVersionConcurrencyControl();
@Parameters
public static Collection<Object[]> parameters() {
// Try out different combinations of row count and KeyValue count
int[] NB_ROWS = { 1500, 60000 };
int[] NB_KVS = { 1, 100 };
// whether compression is used
Boolean[] BOOL_VALS = { false, true };
List<Object[]> parameters = new ArrayList<Object[]>();
for (int nbRows : NB_ROWS) {
for (int walEditKVs : NB_KVS) {
for (boolean b : BOOL_VALS) {
Object[] arr = new Object[3];
arr[0] = nbRows;
arr[1] = walEditKVs;
arr[2] = b;
parameters.add(arr);
}
}
}
return parameters;
}
public TestReplicationWALReaderManager(int nbRows, int walEditKVs, boolean enableCompression) {
this.nbRows = nbRows;
this.walEditKVs = walEditKVs;
TEST_UTIL.getConfiguration().setBoolean(HConstants.ENABLE_WAL_COMPRESSION,
enableCompression);
mvcc.advanceTo(1);
}
@BeforeClass
public static void setUpBeforeClass() throws Exception {
TEST_UTIL = new HBaseTestingUtility();
conf = TEST_UTIL.getConfiguration();
TEST_UTIL.startMiniDFSCluster(3);
cluster = TEST_UTIL.getDFSCluster();
fs = cluster.getFileSystem();
scopes = new TreeMap<byte[], Integer>(
Bytes.BYTES_COMPARATOR);
for(byte[] fam : htd.getFamiliesKeys()) {
scopes.put(fam, 0);
}
}
@AfterClass
public static void tearDownAfterClass() throws Exception {
TEST_UTIL.shutdownMiniCluster();
}
@Before
public void setUp() throws Exception {
logManager = new ReplicationWALReaderManager(fs, conf);
List<WALActionsListener> listeners = new ArrayList<WALActionsListener>(1);
pathWatcher = new PathWatcher();
listeners.add(pathWatcher);
final WALFactory wals = new WALFactory(conf, listeners, tn.getMethodName());
log = wals.getWAL(info.getEncodedNameAsBytes(), info.getTable().getNamespace());
}
@After
public void tearDown() throws Exception {
log.close();
}
@Test
public void test() throws Exception {
// Grab the path that was generated when the log rolled as part of its creation
Path path = pathWatcher.currentPath;
assertEquals(0, logManager.getPosition());
appendToLog();
// There's one edit in the log, read it. Reading past it needs to return nulls
assertNotNull(logManager.openReader(path));
logManager.seek();
WAL.Entry entry = logManager.readNextAndSetPosition();
assertNotNull(entry);
entry = logManager.readNextAndSetPosition();
assertNull(entry);
logManager.closeReader();
long oldPos = logManager.getPosition();
appendToLog();
// Read the newly added entry, make sure we made progress
assertNotNull(logManager.openReader(path));
logManager.seek();
entry = logManager.readNextAndSetPosition();
assertNotEquals(oldPos, logManager.getPosition());
assertNotNull(entry);
logManager.closeReader();
oldPos = logManager.getPosition();
log.rollWriter();
// We rolled but we still should see the end of the first log and not get data
assertNotNull(logManager.openReader(path));
logManager.seek();
entry = logManager.readNextAndSetPosition();
assertEquals(oldPos, logManager.getPosition());
assertNull(entry);
logManager.finishCurrentFile();
path = pathWatcher.currentPath;
for (int i = 0; i < nbRows; i++) { appendToLogPlus(walEditKVs); }
log.rollWriter();
logManager.openReader(path);
logManager.seek();
for (int i = 0; i < nbRows; i++) {
WAL.Entry e = logManager.readNextAndSetPosition();
if (e == null) {
fail("Should have enough entries");
}
}
}
private void appendToLog() throws IOException {
appendToLogPlus(1);
}
private void appendToLogPlus(int count) throws IOException {
final long txid = log.append(info, new WALKey(info.getEncodedNameAsBytes(), tableName,
System.currentTimeMillis(), mvcc, scopes), getWALEdits(count), true);
log.sync(txid);
}
private WALEdit getWALEdits(int count) {
WALEdit edit = new WALEdit();
for (int i = 0; i < count; i++) {
edit.add(new KeyValue(Bytes.toBytes(System.currentTimeMillis()), family, qualifier,
System.currentTimeMillis(), qualifier));
}
return edit;
}
class PathWatcher extends WALActionsListener.Base {
Path currentPath;
@Override
public void preLogRoll(Path oldPath, Path newPath) throws IOException {
currentPath = newPath;
}
}
}

View File

@ -0,0 +1,440 @@
/*
*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hbase.replication.regionserver;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertNotEquals;
import static org.junit.Assert.assertNotNull;
import static org.junit.Assert.assertTrue;
import static org.junit.Assert.fail;
import static org.mockito.Mockito.when;
import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
import java.util.NavigableMap;
import java.util.NoSuchElementException;
import java.util.TreeMap;
import java.util.concurrent.PriorityBlockingQueue;
import java.util.concurrent.atomic.AtomicLong;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hbase.Cell;
import org.apache.hadoop.hbase.HBaseTestingUtility;
import org.apache.hadoop.hbase.HConstants;
import org.apache.hadoop.hbase.HRegionInfo;
import org.apache.hadoop.hbase.HTableDescriptor;
import org.apache.hadoop.hbase.KeyValue;
import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.regionserver.MultiVersionConcurrencyControl;
import org.apache.hadoop.hbase.regionserver.wal.WALActionsListener;
import org.apache.hadoop.hbase.regionserver.wal.WALEdit;
import org.apache.hadoop.hbase.replication.ReplicationQueueInfo;
import org.apache.hadoop.hbase.replication.WALEntryFilter;
import org.apache.hadoop.hbase.replication.regionserver.ReplicationSourceWALReaderThread.WALEntryBatch;
import org.apache.hadoop.hbase.testclassification.LargeTests;
import org.apache.hadoop.hbase.testclassification.ReplicationTests;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.hbase.wal.WAL;
import org.apache.hadoop.hbase.wal.WAL.Entry;
import org.apache.hadoop.hbase.wal.WALFactory;
import org.apache.hadoop.hbase.wal.WALKey;
import org.apache.hadoop.hdfs.MiniDFSCluster;
import org.junit.After;
import org.junit.AfterClass;
import org.junit.Before;
import org.junit.BeforeClass;
import org.junit.Rule;
import org.junit.Test;
import org.junit.experimental.categories.Category;
import org.junit.rules.TestName;
import org.junit.runner.RunWith;
import org.mockito.Mockito;
import org.mockito.runners.MockitoJUnitRunner;
@RunWith(MockitoJUnitRunner.class)
@Category({ ReplicationTests.class, LargeTests.class })
public class TestWALEntryStream {
private static HBaseTestingUtility TEST_UTIL;
private static Configuration conf;
private static FileSystem fs;
private static MiniDFSCluster cluster;
private static final TableName tableName = TableName.valueOf("tablename");
private static final byte[] family = Bytes.toBytes("column");
private static final byte[] qualifier = Bytes.toBytes("qualifier");
private static final HRegionInfo info =
new HRegionInfo(tableName, HConstants.EMPTY_START_ROW, HConstants.LAST_ROW, false);
private static final HTableDescriptor htd = new HTableDescriptor(tableName);
private static NavigableMap<byte[], Integer> scopes;
private WAL log;
PriorityBlockingQueue<Path> walQueue;
private PathWatcher pathWatcher;
@Rule
public TestName tn = new TestName();
private final MultiVersionConcurrencyControl mvcc = new MultiVersionConcurrencyControl();
@BeforeClass
public static void setUpBeforeClass() throws Exception {
TEST_UTIL = new HBaseTestingUtility();
conf = TEST_UTIL.getConfiguration();
TEST_UTIL.startMiniDFSCluster(3);
cluster = TEST_UTIL.getDFSCluster();
fs = cluster.getFileSystem();
scopes = new TreeMap<byte[], Integer>(Bytes.BYTES_COMPARATOR);
for (byte[] fam : htd.getFamiliesKeys()) {
scopes.put(fam, 0);
}
}
@AfterClass
public static void tearDownAfterClass() throws Exception {
TEST_UTIL.shutdownMiniCluster();
}
@Before
public void setUp() throws Exception {
walQueue = new PriorityBlockingQueue<>();
List<WALActionsListener> listeners = new ArrayList<WALActionsListener>();
pathWatcher = new PathWatcher();
listeners.add(pathWatcher);
final WALFactory wals = new WALFactory(conf, listeners, tn.getMethodName());
log = wals.getWAL(info.getEncodedNameAsBytes(), info.getTable().getNamespace());
}
@After
public void tearDown() throws Exception {
log.close();
}
// Try out different combinations of row count and KeyValue count
@Test
public void testDifferentCounts() throws Exception {
int[] NB_ROWS = { 1500, 60000 };
int[] NB_KVS = { 1, 100 };
// whether compression is used
Boolean[] BOOL_VALS = { false, true };
// long lastPosition = 0;
for (int nbRows : NB_ROWS) {
for (int walEditKVs : NB_KVS) {
for (boolean isCompressionEnabled : BOOL_VALS) {
TEST_UTIL.getConfiguration().setBoolean(HConstants.ENABLE_WAL_COMPRESSION,
isCompressionEnabled);
mvcc.advanceTo(1);
for (int i = 0; i < nbRows; i++) {
appendToLogPlus(walEditKVs);
}
log.rollWriter();
try (WALEntryStream entryStream =
new WALEntryStream(walQueue, fs, conf, new MetricsSource("1"))) {
int i = 0;
for (WAL.Entry e : entryStream) {
assertNotNull(e);
i++;
}
assertEquals(nbRows, i);
// should've read all entries
assertFalse(entryStream.hasNext());
}
// reset everything for next loop
log.close();
setUp();
}
}
}
}
/**
* Tests basic reading of log appends
*/
@Test
public void testAppendsWithRolls() throws Exception {
appendToLog();
long oldPos;
try (WALEntryStream entryStream =
new WALEntryStream(walQueue, fs, conf, new MetricsSource("1"))) {
// There's one edit in the log, read it. Reading past it needs to throw exception
assertTrue(entryStream.hasNext());
WAL.Entry entry = entryStream.next();
assertNotNull(entry);
assertFalse(entryStream.hasNext());
try {
entry = entryStream.next();
fail();
} catch (NoSuchElementException e) {
// expected
}
oldPos = entryStream.getPosition();
}
appendToLog();
try (WALEntryStream entryStream =
new WALEntryStream(walQueue, fs, conf, oldPos, new MetricsSource("1"))) {
// Read the newly added entry, make sure we made progress
WAL.Entry entry = entryStream.next();
assertNotEquals(oldPos, entryStream.getPosition());
assertNotNull(entry);
oldPos = entryStream.getPosition();
}
// We rolled but we still should see the end of the first log and get that item
appendToLog();
log.rollWriter();
appendToLog();
try (WALEntryStream entryStream =
new WALEntryStream(walQueue, fs, conf, oldPos, new MetricsSource("1"))) {
WAL.Entry entry = entryStream.next();
assertNotEquals(oldPos, entryStream.getPosition());
assertNotNull(entry);
// next item should come from the new log
entry = entryStream.next();
assertNotEquals(oldPos, entryStream.getPosition());
assertNotNull(entry);
// no more entries to read
assertFalse(entryStream.hasNext());
oldPos = entryStream.getPosition();
}
}
/**
* Tests that if after a stream is opened, more entries come in and then the log is rolled, we
* don't mistakenly dequeue the current log thinking we're done with it
*/
@Test
public void testLogrollWhileStreaming() throws Exception {
appendToLog("1");
appendToLog("2");// 2
try (WALEntryStream entryStream =
new WALEntryStream(walQueue, fs, conf, new MetricsSource("1"))) {
assertEquals("1", getRow(entryStream.next()));
appendToLog("3"); // 3 - comes in after reader opened
log.rollWriter(); // log roll happening while we're reading
appendToLog("4"); // 4 - this append is in the rolled log
assertEquals("2", getRow(entryStream.next()));
assertEquals(2, walQueue.size()); // we should not have dequeued yet since there's still an
// entry in first log
assertEquals("3", getRow(entryStream.next())); // if implemented improperly, this would be 4
// and 3 would be skipped
assertEquals("4", getRow(entryStream.next())); // 4
assertEquals(1, walQueue.size()); // now we've dequeued and moved on to next log properly
assertFalse(entryStream.hasNext());
}
}
/**
* Tests that if writes come in while we have a stream open, we shouldn't miss them
*/
@Test
public void testNewEntriesWhileStreaming() throws Exception {
appendToLog("1");
try (WALEntryStream entryStream =
new WALEntryStream(walQueue, fs, conf, 0, new MetricsSource("1"))) {
entryStream.next(); // we've hit the end of the stream at this point
// some new entries come in while we're streaming
appendToLog("2");
appendToLog("3");
// don't see them
assertFalse(entryStream.hasNext());
// But we do if we reset
entryStream.reset();
assertEquals("2", getRow(entryStream.next()));
assertEquals("3", getRow(entryStream.next()));
assertFalse(entryStream.hasNext());
}
}
@Test
public void testResumeStreamingFromPosition() throws Exception {
long lastPosition = 0;
appendToLog("1");
try (WALEntryStream entryStream =
new WALEntryStream(walQueue, fs, conf, 0, new MetricsSource("1"))) {
entryStream.next(); // we've hit the end of the stream at this point
appendToLog("2");
appendToLog("3");
lastPosition = entryStream.getPosition();
}
// next stream should picks up where we left off
try (WALEntryStream entryStream =
new WALEntryStream(walQueue, fs, conf, lastPosition, new MetricsSource("1"))) {
assertEquals("2", getRow(entryStream.next()));
assertEquals("3", getRow(entryStream.next()));
assertFalse(entryStream.hasNext()); // done
assertEquals(1, walQueue.size());
}
}
/**
* Tests that if we stop before hitting the end of a stream, we can continue where we left off
* using the last position
*/
@Test
public void testPosition() throws Exception {
long lastPosition = 0;
appendEntriesToLog(3);
// read only one element
try (WALEntryStream entryStream =
new WALEntryStream(walQueue, fs, conf, lastPosition, new MetricsSource("1"))) {
entryStream.next();
lastPosition = entryStream.getPosition();
}
// there should still be two more entries from where we left off
try (WALEntryStream entryStream =
new WALEntryStream(walQueue, fs, conf, lastPosition, new MetricsSource("1"))) {
assertNotNull(entryStream.next());
assertNotNull(entryStream.next());
assertFalse(entryStream.hasNext());
}
}
@Test
public void testEmptyStream() throws Exception {
try (WALEntryStream entryStream =
new WALEntryStream(walQueue, fs, conf, 0, new MetricsSource("1"))) {
assertFalse(entryStream.hasNext());
}
}
@Test
public void testReplicationSourceWALReaderThread() throws Exception {
appendEntriesToLog(3);
// get ending position
long position;
try (WALEntryStream entryStream =
new WALEntryStream(walQueue, fs, conf, new MetricsSource("1"))) {
entryStream.next();
entryStream.next();
entryStream.next();
position = entryStream.getPosition();
}
// start up a batcher
ReplicationSourceManager mockSourceManager = Mockito.mock(ReplicationSourceManager.class);
when(mockSourceManager.getTotalBufferUsed()).thenReturn(new AtomicLong(0));
ReplicationSourceWALReaderThread batcher = new ReplicationSourceWALReaderThread(mockSourceManager, getQueueInfo(),walQueue, 0,
fs, conf, getDummyFilter(), new MetricsSource("1"));
Path walPath = walQueue.peek();
batcher.start();
WALEntryBatch entryBatch = batcher.take();
// should've batched up our entries
assertNotNull(entryBatch);
assertEquals(3, entryBatch.getWalEntries().size());
assertEquals(position, entryBatch.getLastWalPosition());
assertEquals(walPath, entryBatch.getLastWalPath());
assertEquals(3, entryBatch.getNbRowKeys());
appendToLog("foo");
entryBatch = batcher.take();
assertEquals(1, entryBatch.getNbEntries());
assertEquals(getRow(entryBatch.getWalEntries().get(0)), "foo");
}
private String getRow(WAL.Entry entry) {
Cell cell = entry.getEdit().getCells().get(0);
return Bytes.toString(cell.getRowArray(), cell.getRowOffset(), cell.getRowLength());
}
private void appendToLog(String key) throws IOException {
final long txid = log.append(info,
new WALKey(info.getEncodedNameAsBytes(), tableName, System.currentTimeMillis(), mvcc, scopes),
getWALEdit(key), true);
log.sync(txid);
}
private void appendEntriesToLog(int count) throws IOException {
for (int i = 0; i < count; i++) {
appendToLog();
}
}
private void appendToLog() throws IOException {
appendToLogPlus(1);
}
private void appendToLogPlus(int count) throws IOException {
final long txid = log.append(info,
new WALKey(info.getEncodedNameAsBytes(), tableName, System.currentTimeMillis(), mvcc, scopes),
getWALEdits(count), true);
log.sync(txid);
}
private WALEdit getWALEdits(int count) {
WALEdit edit = new WALEdit();
for (int i = 0; i < count; i++) {
edit.add(new KeyValue(Bytes.toBytes(System.currentTimeMillis()), family, qualifier,
System.currentTimeMillis(), qualifier));
}
return edit;
}
private WALEdit getWALEdit(String row) {
WALEdit edit = new WALEdit();
edit.add(
new KeyValue(Bytes.toBytes(row), family, qualifier, System.currentTimeMillis(), qualifier));
return edit;
}
private WALEntryFilter getDummyFilter() {
return new WALEntryFilter() {
@Override
public Entry filter(Entry entry) {
return entry;
}
};
}
private ReplicationQueueInfo getQueueInfo() {
return new ReplicationQueueInfo("1");
}
class PathWatcher extends WALActionsListener.Base {
Path currentPath;
@Override
public void preLogRoll(Path oldPath, Path newPath) throws IOException {
walQueue.add(newPath);
currentPath = newPath;
}
}
}