HBASE-15995 Separate replication WAL reading from shipping
Signed-off-by: tedyu <yuzhihong@gmail.com>
This commit is contained in:
parent
b66a478e73
commit
3cf4433260
|
@ -0,0 +1,70 @@
|
|||
/**
|
||||
*
|
||||
* Licensed to the Apache Software Foundation (ASF) under one
|
||||
* or more contributor license agreements. See the NOTICE file
|
||||
* distributed with this work for additional information
|
||||
* regarding copyright ownership. The ASF licenses this file
|
||||
* to you under the Apache License, Version 2.0 (the
|
||||
* "License"); you may not use this file except in compliance
|
||||
* with the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
package org.apache.hadoop.hbase.replication;
|
||||
|
||||
import java.util.UUID;
|
||||
|
||||
import org.apache.hadoop.hbase.HBaseInterfaceAudience;
|
||||
import org.apache.hadoop.hbase.classification.InterfaceAudience;
|
||||
import org.apache.hadoop.hbase.classification.InterfaceStability;
|
||||
import org.apache.hadoop.hbase.regionserver.wal.WALEdit;
|
||||
import org.apache.hadoop.hbase.wal.WALKey;
|
||||
import org.apache.hadoop.hbase.wal.WAL.Entry;
|
||||
|
||||
|
||||
/**
|
||||
* Filters out entries with our peerClusterId (i.e. already replicated)
|
||||
* and marks all other entries with our clusterID
|
||||
*/
|
||||
@InterfaceAudience.LimitedPrivate(HBaseInterfaceAudience.REPLICATION)
|
||||
@InterfaceStability.Evolving
|
||||
public class ClusterMarkingEntryFilter implements WALEntryFilter {
|
||||
private UUID clusterId;
|
||||
private UUID peerClusterId;
|
||||
private ReplicationEndpoint replicationEndpoint;
|
||||
|
||||
/**
|
||||
* @param clusterId id of this cluster
|
||||
* @param peerClusterId of the other cluster
|
||||
* @param replicationEndpoint ReplicationEndpoint which will handle the actual replication
|
||||
*/
|
||||
public ClusterMarkingEntryFilter(UUID clusterId, UUID peerClusterId, ReplicationEndpoint replicationEndpoint) {
|
||||
this.clusterId = clusterId;
|
||||
this.peerClusterId = peerClusterId;
|
||||
this.replicationEndpoint = replicationEndpoint;
|
||||
}
|
||||
@Override
|
||||
public Entry filter(Entry entry) {
|
||||
// don't replicate if the log entries have already been consumed by the cluster
|
||||
if (replicationEndpoint.canReplicateToSameCluster()
|
||||
|| !entry.getKey().getClusterIds().contains(peerClusterId)) {
|
||||
WALEdit edit = entry.getEdit();
|
||||
WALKey logKey = entry.getKey();
|
||||
|
||||
if (edit != null && !edit.isEmpty()) {
|
||||
// Mark that the current cluster has the change
|
||||
logKey.addClusterId(clusterId);
|
||||
// We need to set the CC to null else it will be compressed when sent to the sink
|
||||
entry.setCompressionContext(null);
|
||||
return entry;
|
||||
}
|
||||
}
|
||||
return null;
|
||||
}
|
||||
}
|
File diff suppressed because it is too large
Load Diff
|
@ -0,0 +1,471 @@
|
|||
/**
|
||||
*
|
||||
* Licensed to the Apache Software Foundation (ASF) under one
|
||||
* or more contributor license agreements. See the NOTICE file
|
||||
* distributed with this work for additional information
|
||||
* regarding copyright ownership. The ASF licenses this file
|
||||
* to you under the Apache License, Version 2.0 (the
|
||||
* "License"); you may not use this file except in compliance
|
||||
* with the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
package org.apache.hadoop.hbase.replication.regionserver;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.util.ArrayList;
|
||||
import java.util.HashMap;
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
import java.util.concurrent.BlockingQueue;
|
||||
import java.util.concurrent.LinkedBlockingQueue;
|
||||
import java.util.concurrent.PriorityBlockingQueue;
|
||||
import java.util.concurrent.atomic.AtomicLong;
|
||||
|
||||
import org.apache.commons.logging.Log;
|
||||
import org.apache.commons.logging.LogFactory;
|
||||
import org.apache.hadoop.conf.Configuration;
|
||||
import org.apache.hadoop.fs.FileSystem;
|
||||
import org.apache.hadoop.fs.Path;
|
||||
import org.apache.hadoop.hbase.Cell;
|
||||
import org.apache.hadoop.hbase.CellUtil;
|
||||
import org.apache.hadoop.hbase.HConstants;
|
||||
import org.apache.hadoop.hbase.classification.InterfaceAudience;
|
||||
import org.apache.hadoop.hbase.classification.InterfaceStability;
|
||||
import org.apache.hadoop.hbase.protobuf.generated.WALProtos;
|
||||
import org.apache.hadoop.hbase.protobuf.generated.WALProtos.BulkLoadDescriptor;
|
||||
import org.apache.hadoop.hbase.protobuf.generated.WALProtos.StoreDescriptor;
|
||||
import org.apache.hadoop.hbase.regionserver.wal.WALEdit;
|
||||
import org.apache.hadoop.hbase.replication.ReplicationQueueInfo;
|
||||
import org.apache.hadoop.hbase.replication.WALEntryFilter;
|
||||
import org.apache.hadoop.hbase.replication.regionserver.WALEntryStream.WALEntryStreamRuntimeException;
|
||||
import org.apache.hadoop.hbase.util.Bytes;
|
||||
import org.apache.hadoop.hbase.util.Pair;
|
||||
import org.apache.hadoop.hbase.util.Threads;
|
||||
import org.apache.hadoop.hbase.wal.WAL.Entry;
|
||||
|
||||
/**
|
||||
* Reads and filters WAL entries, groups the filtered entries into batches, and puts the batches onto a queue
|
||||
*
|
||||
*/
|
||||
@InterfaceAudience.Private
|
||||
@InterfaceStability.Evolving
|
||||
public class ReplicationSourceWALReaderThread extends Thread {
|
||||
private static final Log LOG = LogFactory.getLog(ReplicationSourceWALReaderThread.class);
|
||||
|
||||
private PriorityBlockingQueue<Path> logQueue;
|
||||
private FileSystem fs;
|
||||
private Configuration conf;
|
||||
private BlockingQueue<WALEntryBatch> entryBatchQueue;
|
||||
// max (heap) size of each batch - multiply by number of batches in queue to get total
|
||||
private long replicationBatchSizeCapacity;
|
||||
// max count of each batch - multiply by number of batches in queue to get total
|
||||
private int replicationBatchCountCapacity;
|
||||
// position in the WAL to start reading at
|
||||
private long currentPosition;
|
||||
private WALEntryFilter filter;
|
||||
private long sleepForRetries;
|
||||
//Indicates whether this particular worker is running
|
||||
private boolean isReaderRunning = true;
|
||||
private ReplicationQueueInfo replicationQueueInfo;
|
||||
private int maxRetriesMultiplier;
|
||||
private MetricsSource metrics;
|
||||
|
||||
private AtomicLong totalBufferUsed;
|
||||
private long totalBufferQuota;
|
||||
|
||||
/**
|
||||
* Creates a reader worker for a given WAL queue. Reads WAL entries off a given queue, batches the
|
||||
* entries, and puts them on a batch queue.
|
||||
* @param manager replication manager
|
||||
* @param replicationQueueInfo
|
||||
* @param logQueue The WAL queue to read off of
|
||||
* @param startPosition position in the first WAL to start reading from
|
||||
* @param fs the files system to use
|
||||
* @param conf configuration to use
|
||||
* @param filter The filter to use while reading
|
||||
* @param metrics replication metrics
|
||||
*/
|
||||
public ReplicationSourceWALReaderThread(ReplicationSourceManager manager,
|
||||
ReplicationQueueInfo replicationQueueInfo, PriorityBlockingQueue<Path> logQueue,
|
||||
long startPosition,
|
||||
FileSystem fs, Configuration conf, WALEntryFilter filter, MetricsSource metrics) {
|
||||
this.replicationQueueInfo = replicationQueueInfo;
|
||||
this.logQueue = logQueue;
|
||||
this.currentPosition = startPosition;
|
||||
this.fs = fs;
|
||||
this.conf = conf;
|
||||
this.filter = filter;
|
||||
this.replicationBatchSizeCapacity =
|
||||
this.conf.getLong("replication.source.size.capacity", 1024 * 1024 * 64);
|
||||
this.replicationBatchCountCapacity = this.conf.getInt("replication.source.nb.capacity", 25000);
|
||||
// memory used will be batchSizeCapacity * (nb.batches + 1)
|
||||
// the +1 is for the current thread reading before placing onto the queue
|
||||
int batchCount = conf.getInt("replication.source.nb.batches", 1);
|
||||
this.totalBufferUsed = manager.getTotalBufferUsed();
|
||||
this.totalBufferQuota = conf.getLong(HConstants.REPLICATION_SOURCE_TOTAL_BUFFER_KEY,
|
||||
HConstants.REPLICATION_SOURCE_TOTAL_BUFFER_DFAULT);
|
||||
this.sleepForRetries =
|
||||
this.conf.getLong("replication.source.sleepforretries", 1000); // 1 second
|
||||
this.maxRetriesMultiplier =
|
||||
this.conf.getInt("replication.source.maxretriesmultiplier", 300); // 5 minutes @ 1 sec per
|
||||
this.metrics = metrics;
|
||||
this.entryBatchQueue = new LinkedBlockingQueue<>(batchCount);
|
||||
LOG.info("peerClusterZnode=" + replicationQueueInfo.getPeerClusterZnode()
|
||||
+ ", ReplicationSourceWALReaderThread : " + replicationQueueInfo.getPeerId()
|
||||
+ " inited, replicationBatchSizeCapacity=" + replicationBatchSizeCapacity
|
||||
+ ", replicationBatchCountCapacity=" + replicationBatchCountCapacity
|
||||
+ ", replicationBatchQueueCapacity=" + batchCount);
|
||||
}
|
||||
|
||||
@Override
|
||||
public void run() {
|
||||
int sleepMultiplier = 1;
|
||||
while (isReaderRunning()) { // we only loop back here if something fatal happened to our stream
|
||||
try (WALEntryStream entryStream =
|
||||
new WALEntryStream(logQueue, fs, conf, currentPosition, metrics)) {
|
||||
while (isReaderRunning()) { // loop here to keep reusing stream while we can
|
||||
if (!checkQuota()) {
|
||||
continue;
|
||||
}
|
||||
WALEntryBatch batch = null;
|
||||
while (entryStream.hasNext()) {
|
||||
if (batch == null) {
|
||||
batch = new WALEntryBatch(replicationBatchCountCapacity, entryStream.getCurrentPath());
|
||||
}
|
||||
Entry entry = entryStream.next();
|
||||
if (updateSerialReplPos(batch, entry)) {
|
||||
batch.lastWalPosition = entryStream.getPosition();
|
||||
break;
|
||||
}
|
||||
entry = filterEntry(entry);
|
||||
if (entry != null) {
|
||||
WALEdit edit = entry.getEdit();
|
||||
if (edit != null && !edit.isEmpty()) {
|
||||
long entrySize = getEntrySize(entry);
|
||||
batch.addEntry(entry);
|
||||
updateBatchStats(batch, entry, entryStream.getPosition(), entrySize);
|
||||
boolean totalBufferTooLarge = acquireBufferQuota(entrySize);
|
||||
// Stop if too many entries or too big
|
||||
if (totalBufferTooLarge || batch.getHeapSize() >= replicationBatchSizeCapacity
|
||||
|| batch.getNbEntries() >= replicationBatchCountCapacity) {
|
||||
break;
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
if (batch != null && (!batch.getLastSeqIds().isEmpty() || batch.getNbEntries() > 0)) {
|
||||
if (LOG.isTraceEnabled()) {
|
||||
LOG.trace(String.format("Read %s WAL entries eligible for replication",
|
||||
batch.getNbEntries()));
|
||||
}
|
||||
entryBatchQueue.put(batch);
|
||||
sleepMultiplier = 1;
|
||||
} else { // got no entries and didn't advance position in WAL
|
||||
LOG.trace("Didn't read any new entries from WAL");
|
||||
if (replicationQueueInfo.isQueueRecovered()) {
|
||||
// we're done with queue recovery, shut ourself down
|
||||
setReaderRunning(false);
|
||||
// shuts down shipper thread immediately
|
||||
entryBatchQueue.put(batch != null ? batch
|
||||
: new WALEntryBatch(replicationBatchCountCapacity, entryStream.getCurrentPath()));
|
||||
} else {
|
||||
Thread.sleep(sleepForRetries);
|
||||
}
|
||||
}
|
||||
currentPosition = entryStream.getPosition();
|
||||
entryStream.reset(); // reuse stream
|
||||
}
|
||||
} catch (IOException | WALEntryStreamRuntimeException e) { // stream related
|
||||
if (sleepMultiplier < maxRetriesMultiplier) {
|
||||
LOG.debug("Failed to read stream of replication entries: " + e);
|
||||
sleepMultiplier++;
|
||||
} else {
|
||||
LOG.error("Failed to read stream of replication entries", e);
|
||||
}
|
||||
Threads.sleep(sleepForRetries * sleepMultiplier);
|
||||
} catch (InterruptedException e) {
|
||||
LOG.trace("Interrupted while sleeping between WAL reads");
|
||||
Thread.currentThread().interrupt();
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
//returns false if we've already exceeded the global quota
|
||||
private boolean checkQuota() {
|
||||
// try not to go over total quota
|
||||
if (totalBufferUsed.get() > totalBufferQuota) {
|
||||
Threads.sleep(sleepForRetries);
|
||||
return false;
|
||||
}
|
||||
return true;
|
||||
}
|
||||
|
||||
private Entry filterEntry(Entry entry) {
|
||||
Entry filtered = filter.filter(entry);
|
||||
if (entry != null && filtered == null) {
|
||||
metrics.incrLogEditsFiltered();
|
||||
}
|
||||
return filtered;
|
||||
}
|
||||
|
||||
/**
|
||||
* @return true if we should stop reading because we're at REGION_CLOSE
|
||||
*/
|
||||
private boolean updateSerialReplPos(WALEntryBatch batch, Entry entry) throws IOException {
|
||||
if (entry.hasSerialReplicationScope()) {
|
||||
String key = Bytes.toString(entry.getKey().getEncodedRegionName());
|
||||
batch.setLastPosition(key, entry.getKey().getSequenceId());
|
||||
if (!entry.getEdit().getCells().isEmpty()) {
|
||||
WALProtos.RegionEventDescriptor maybeEvent =
|
||||
WALEdit.getRegionEventDescriptor(entry.getEdit().getCells().get(0));
|
||||
if (maybeEvent != null && maybeEvent
|
||||
.getEventType() == WALProtos.RegionEventDescriptor.EventType.REGION_CLOSE) {
|
||||
// In serially replication, if we move a region to another RS and move it back, we may
|
||||
// read logs crossing two sections. We should break at REGION_CLOSE and push the first
|
||||
// section first in case of missing the middle section belonging to the other RS.
|
||||
// In a worker thread, if we can push the first log of a region, we can push all logs
|
||||
// in the same region without waiting until we read a close marker because next time
|
||||
// we read logs in this region, it must be a new section and not adjacent with this
|
||||
// region. Mark it negative.
|
||||
batch.setLastPosition(key, -entry.getKey().getSequenceId());
|
||||
return true;
|
||||
}
|
||||
}
|
||||
}
|
||||
return false;
|
||||
}
|
||||
|
||||
/**
|
||||
* Retrieves the next batch of WAL entries from the queue, waiting up to the specified time for a
|
||||
* batch to become available
|
||||
* @return A batch of entries, along with the position in the log after reading the batch
|
||||
* @throws InterruptedException if interrupted while waiting
|
||||
*/
|
||||
public WALEntryBatch take() throws InterruptedException {
|
||||
return entryBatchQueue.take();
|
||||
}
|
||||
|
||||
private long getEntrySize(Entry entry) {
|
||||
WALEdit edit = entry.getEdit();
|
||||
return edit.heapSize() + calculateTotalSizeOfStoreFiles(edit);
|
||||
}
|
||||
|
||||
private void updateBatchStats(WALEntryBatch batch, Entry entry, long entryPosition, long entrySize) {
|
||||
WALEdit edit = entry.getEdit();
|
||||
if (edit != null && !edit.isEmpty()) {
|
||||
batch.incrementHeapSize(entrySize);
|
||||
Pair<Integer, Integer> nbRowsAndHFiles = countDistinctRowKeysAndHFiles(edit);
|
||||
batch.incrementNbRowKeys(nbRowsAndHFiles.getFirst());
|
||||
batch.incrementNbHFiles(nbRowsAndHFiles.getSecond());
|
||||
}
|
||||
batch.lastWalPosition = entryPosition;
|
||||
}
|
||||
|
||||
/**
|
||||
* Count the number of different row keys in the given edit because of mini-batching. We assume
|
||||
* that there's at least one Cell in the WALEdit.
|
||||
* @param edit edit to count row keys from
|
||||
* @return number of different row keys and HFiles
|
||||
*/
|
||||
private Pair<Integer, Integer> countDistinctRowKeysAndHFiles(WALEdit edit) {
|
||||
List<Cell> cells = edit.getCells();
|
||||
int distinctRowKeys = 1;
|
||||
int totalHFileEntries = 0;
|
||||
Cell lastCell = cells.get(0);
|
||||
|
||||
int totalCells = edit.size();
|
||||
for (int i = 0; i < totalCells; i++) {
|
||||
// Count HFiles to be replicated
|
||||
if (CellUtil.matchingQualifier(cells.get(i), WALEdit.BULK_LOAD)) {
|
||||
try {
|
||||
BulkLoadDescriptor bld = WALEdit.getBulkLoadDescriptor(cells.get(i));
|
||||
List<StoreDescriptor> stores = bld.getStoresList();
|
||||
int totalStores = stores.size();
|
||||
for (int j = 0; j < totalStores; j++) {
|
||||
totalHFileEntries += stores.get(j).getStoreFileList().size();
|
||||
}
|
||||
} catch (IOException e) {
|
||||
LOG.error("Failed to deserialize bulk load entry from wal edit. "
|
||||
+ "Then its hfiles count will not be added into metric.");
|
||||
}
|
||||
}
|
||||
|
||||
if (!CellUtil.matchingRow(cells.get(i), lastCell)) {
|
||||
distinctRowKeys++;
|
||||
}
|
||||
lastCell = cells.get(i);
|
||||
}
|
||||
|
||||
Pair<Integer, Integer> result = new Pair<>(distinctRowKeys, totalHFileEntries);
|
||||
return result;
|
||||
}
|
||||
|
||||
/**
|
||||
* Calculate the total size of all the store files
|
||||
* @param edit edit to count row keys from
|
||||
* @return the total size of the store files
|
||||
*/
|
||||
private int calculateTotalSizeOfStoreFiles(WALEdit edit) {
|
||||
List<Cell> cells = edit.getCells();
|
||||
int totalStoreFilesSize = 0;
|
||||
|
||||
int totalCells = edit.size();
|
||||
for (int i = 0; i < totalCells; i++) {
|
||||
if (CellUtil.matchingQualifier(cells.get(i), WALEdit.BULK_LOAD)) {
|
||||
try {
|
||||
BulkLoadDescriptor bld = WALEdit.getBulkLoadDescriptor(cells.get(i));
|
||||
List<StoreDescriptor> stores = bld.getStoresList();
|
||||
int totalStores = stores.size();
|
||||
for (int j = 0; j < totalStores; j++) {
|
||||
totalStoreFilesSize += stores.get(j).getStoreFileSizeBytes();
|
||||
}
|
||||
} catch (IOException e) {
|
||||
LOG.error("Failed to deserialize bulk load entry from wal edit. "
|
||||
+ "Size of HFiles part of cell will not be considered in replication "
|
||||
+ "request size calculation.",
|
||||
e);
|
||||
}
|
||||
}
|
||||
}
|
||||
return totalStoreFilesSize;
|
||||
}
|
||||
|
||||
/**
|
||||
* @param size delta size for grown buffer
|
||||
* @return true if we should clear buffer and push all
|
||||
*/
|
||||
private boolean acquireBufferQuota(long size) {
|
||||
return totalBufferUsed.addAndGet(size) >= totalBufferQuota;
|
||||
}
|
||||
|
||||
/**
|
||||
* @return whether the reader thread is running
|
||||
*/
|
||||
public boolean isReaderRunning() {
|
||||
return isReaderRunning && !isInterrupted();
|
||||
}
|
||||
|
||||
/**
|
||||
* @param readerRunning the readerRunning to set
|
||||
*/
|
||||
public void setReaderRunning(boolean readerRunning) {
|
||||
this.isReaderRunning = readerRunning;
|
||||
}
|
||||
|
||||
/**
|
||||
* Holds a batch of WAL entries to replicate, along with some statistics
|
||||
*
|
||||
*/
|
||||
static class WALEntryBatch {
|
||||
private List<Entry> walEntries;
|
||||
// last WAL that was read
|
||||
private Path lastWalPath;
|
||||
// position in WAL of last entry in this batch
|
||||
private long lastWalPosition = 0;
|
||||
// number of distinct row keys in this batch
|
||||
private int nbRowKeys = 0;
|
||||
// number of HFiles
|
||||
private int nbHFiles = 0;
|
||||
// heap size of data we need to replicate
|
||||
private long heapSize = 0;
|
||||
// save the last sequenceid for each region if the table has serial-replication scope
|
||||
private Map<String, Long> lastSeqIds = new HashMap<>();
|
||||
|
||||
/**
|
||||
* @param walEntries
|
||||
* @param lastWalPath Path of the WAL the last entry in this batch was read from
|
||||
* @param lastWalPosition Position in the WAL the last entry in this batch was read from
|
||||
*/
|
||||
private WALEntryBatch(int maxNbEntries, Path lastWalPath) {
|
||||
this.walEntries = new ArrayList<>(maxNbEntries);
|
||||
this.lastWalPath = lastWalPath;
|
||||
}
|
||||
|
||||
public void addEntry(Entry entry) {
|
||||
walEntries.add(entry);
|
||||
}
|
||||
|
||||
/**
|
||||
* @return the WAL Entries.
|
||||
*/
|
||||
public List<Entry> getWalEntries() {
|
||||
return walEntries;
|
||||
}
|
||||
|
||||
/**
|
||||
* @return the path of the last WAL that was read.
|
||||
*/
|
||||
public Path getLastWalPath() {
|
||||
return lastWalPath;
|
||||
}
|
||||
|
||||
/**
|
||||
* @return the position in the last WAL that was read.
|
||||
*/
|
||||
public long getLastWalPosition() {
|
||||
return lastWalPosition;
|
||||
}
|
||||
|
||||
public int getNbEntries() {
|
||||
return walEntries.size();
|
||||
}
|
||||
|
||||
/**
|
||||
* @return the number of distinct row keys in this batch
|
||||
*/
|
||||
public int getNbRowKeys() {
|
||||
return nbRowKeys;
|
||||
}
|
||||
|
||||
/**
|
||||
* @return the number of HFiles in this batch
|
||||
*/
|
||||
public int getNbHFiles() {
|
||||
return nbHFiles;
|
||||
}
|
||||
|
||||
/**
|
||||
* @return total number of operations in this batch
|
||||
*/
|
||||
public int getNbOperations() {
|
||||
return getNbRowKeys() + getNbHFiles();
|
||||
}
|
||||
|
||||
/**
|
||||
* @return the heap size of this batch
|
||||
*/
|
||||
public long getHeapSize() {
|
||||
return heapSize;
|
||||
}
|
||||
|
||||
/**
|
||||
* @return the last sequenceid for each region if the table has serial-replication scope
|
||||
*/
|
||||
public Map<String, Long> getLastSeqIds() {
|
||||
return lastSeqIds;
|
||||
}
|
||||
|
||||
private void incrementNbRowKeys(int increment) {
|
||||
nbRowKeys += increment;
|
||||
}
|
||||
|
||||
private void incrementNbHFiles(int increment) {
|
||||
nbHFiles += increment;
|
||||
}
|
||||
|
||||
private void incrementHeapSize(long increment) {
|
||||
heapSize += increment;
|
||||
}
|
||||
|
||||
private void setLastPosition(String region, Long sequenceId) {
|
||||
getLastSeqIds().put(region, sequenceId);
|
||||
}
|
||||
}
|
||||
}
|
|
@ -1,155 +0,0 @@
|
|||
/**
|
||||
* Licensed to the Apache Software Foundation (ASF) under one
|
||||
* or more contributor license agreements. See the NOTICE file
|
||||
* distributed with this work for additional information
|
||||
* regarding copyright ownership. The ASF licenses this file
|
||||
* to you under the Apache License, Version 2.0 (the
|
||||
* "License"); you may not use this file except in compliance
|
||||
* with the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
package org.apache.hadoop.hbase.replication.regionserver;
|
||||
|
||||
import org.apache.commons.logging.Log;
|
||||
import org.apache.commons.logging.LogFactory;
|
||||
import org.apache.hadoop.hbase.classification.InterfaceAudience;
|
||||
import org.apache.hadoop.conf.Configuration;
|
||||
import org.apache.hadoop.fs.FileSystem;
|
||||
import org.apache.hadoop.fs.Path;
|
||||
import org.apache.hadoop.hbase.regionserver.wal.ProtobufLogReader;
|
||||
import org.apache.hadoop.hbase.wal.WAL.Reader;
|
||||
import org.apache.hadoop.hbase.wal.WAL.Entry;
|
||||
import org.apache.hadoop.hbase.wal.WALFactory;
|
||||
|
||||
import java.io.IOException;
|
||||
|
||||
/**
|
||||
* Wrapper class around WAL to help manage the implementation details
|
||||
* such as compression.
|
||||
*/
|
||||
@InterfaceAudience.Private
|
||||
public class ReplicationWALReaderManager {
|
||||
|
||||
private static final Log LOG = LogFactory.getLog(ReplicationWALReaderManager.class);
|
||||
private final FileSystem fs;
|
||||
private final Configuration conf;
|
||||
private long position = 0;
|
||||
private Reader reader;
|
||||
private Path lastPath;
|
||||
|
||||
/**
|
||||
* Creates the helper but doesn't open any file
|
||||
* Use setInitialPosition after using the constructor if some content needs to be skipped
|
||||
* @param fs
|
||||
* @param conf
|
||||
*/
|
||||
public ReplicationWALReaderManager(FileSystem fs, Configuration conf) {
|
||||
this.fs = fs;
|
||||
this.conf = conf;
|
||||
}
|
||||
|
||||
/**
|
||||
* Opens the file at the current position
|
||||
* @param path
|
||||
* @return an WAL reader.
|
||||
* @throws IOException
|
||||
*/
|
||||
public Reader openReader(Path path) throws IOException {
|
||||
// Detect if this is a new file, if so get a new reader else
|
||||
// reset the current reader so that we see the new data
|
||||
if (this.reader == null || !this.lastPath.equals(path)) {
|
||||
this.closeReader();
|
||||
this.reader = WALFactory.createReader(this.fs, path, this.conf);
|
||||
this.lastPath = path;
|
||||
} else {
|
||||
try {
|
||||
this.reader.reset();
|
||||
} catch (NullPointerException npe) {
|
||||
throw new IOException("NPE resetting reader, likely HDFS-4380", npe);
|
||||
}
|
||||
}
|
||||
return this.reader;
|
||||
}
|
||||
|
||||
/**
|
||||
* Get the next entry, returned and also added in the array
|
||||
* @return a new entry or null
|
||||
* @throws IOException
|
||||
*/
|
||||
public Entry readNextAndSetPosition() throws IOException {
|
||||
Entry entry = this.reader.next();
|
||||
// Store the position so that in the future the reader can start
|
||||
// reading from here. If the above call to next() throws an
|
||||
// exception, the position won't be changed and retry will happen
|
||||
// from the last known good position
|
||||
this.position = this.reader.getPosition();
|
||||
// We need to set the CC to null else it will be compressed when sent to the sink
|
||||
if (entry != null) {
|
||||
entry.setCompressionContext(null);
|
||||
}
|
||||
return entry;
|
||||
}
|
||||
|
||||
/**
|
||||
* Advance the reader to the current position
|
||||
* @throws IOException
|
||||
*/
|
||||
public void seek() throws IOException {
|
||||
if (this.position != 0) {
|
||||
this.reader.seek(this.position);
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Get the position that we stopped reading at
|
||||
* @return current position, cannot be negative
|
||||
*/
|
||||
public long getPosition() {
|
||||
return this.position;
|
||||
}
|
||||
|
||||
public void setPosition(long pos) {
|
||||
this.position = pos;
|
||||
}
|
||||
|
||||
public long currentTrailerSize() {
|
||||
long size = -1L;
|
||||
if (reader instanceof ProtobufLogReader) {
|
||||
final ProtobufLogReader pblr = (ProtobufLogReader)reader;
|
||||
size = pblr.trailerSize();
|
||||
}
|
||||
return size;
|
||||
}
|
||||
|
||||
/**
|
||||
* Close the current reader
|
||||
* @throws IOException
|
||||
*/
|
||||
public void closeReader() throws IOException {
|
||||
if (this.reader != null) {
|
||||
this.reader.close();
|
||||
this.reader = null;
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Tell the helper to reset internal state
|
||||
*/
|
||||
void finishCurrentFile() {
|
||||
this.position = 0;
|
||||
try {
|
||||
this.closeReader();
|
||||
} catch (IOException e) {
|
||||
LOG.warn("Unable to close reader", e);
|
||||
}
|
||||
}
|
||||
|
||||
}
|
|
@ -0,0 +1,411 @@
|
|||
/**
|
||||
*
|
||||
* Licensed to the Apache Software Foundation (ASF) under one
|
||||
* or more contributor license agreements. See the NOTICE file
|
||||
* distributed with this work for additional information
|
||||
* regarding copyright ownership. The ASF licenses this file
|
||||
* to you under the Apache License, Version 2.0 (the
|
||||
* "License"); you may not use this file except in compliance
|
||||
* with the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
package org.apache.hadoop.hbase.replication.regionserver;
|
||||
|
||||
import java.io.Closeable;
|
||||
import java.io.FileNotFoundException;
|
||||
import java.io.IOException;
|
||||
import java.util.Iterator;
|
||||
import java.util.NoSuchElementException;
|
||||
import java.util.concurrent.PriorityBlockingQueue;
|
||||
|
||||
import org.apache.commons.logging.Log;
|
||||
import org.apache.commons.logging.LogFactory;
|
||||
import org.apache.hadoop.conf.Configuration;
|
||||
import org.apache.hadoop.fs.FileStatus;
|
||||
import org.apache.hadoop.fs.FileSystem;
|
||||
import org.apache.hadoop.fs.Path;
|
||||
import org.apache.hadoop.hbase.HConstants;
|
||||
import org.apache.hadoop.hbase.classification.InterfaceAudience;
|
||||
import org.apache.hadoop.hbase.classification.InterfaceStability;
|
||||
import org.apache.hadoop.hbase.regionserver.wal.ProtobufLogReader;
|
||||
import org.apache.hadoop.hbase.util.CancelableProgressable;
|
||||
import org.apache.hadoop.hbase.util.FSUtils;
|
||||
import org.apache.hadoop.hbase.util.LeaseNotRecoveredException;
|
||||
import org.apache.hadoop.hbase.wal.WAL.Entry;
|
||||
import org.apache.hadoop.hbase.wal.WAL.Reader;
|
||||
import org.apache.hadoop.hbase.wal.WALFactory;
|
||||
|
||||
/**
|
||||
* Streaming access to WAL entries. This class is given a queue of WAL {@link Path}, and continually
|
||||
* iterates through all the WAL {@link Entry} in the queue. When it's done reading from a Path, it
|
||||
* dequeues it and starts reading from the next.
|
||||
*/
|
||||
@InterfaceAudience.Private
|
||||
@InterfaceStability.Evolving
|
||||
public class WALEntryStream implements Iterator<Entry>, Closeable, Iterable<Entry> {
|
||||
private static final Log LOG = LogFactory.getLog(WALEntryStream.class);
|
||||
|
||||
private Reader reader;
|
||||
private Path currentPath;
|
||||
// cache of next entry for hasNext()
|
||||
private Entry currentEntry;
|
||||
// position after reading current entry
|
||||
private long currentPosition = 0;
|
||||
private PriorityBlockingQueue<Path> logQueue;
|
||||
private FileSystem fs;
|
||||
private Configuration conf;
|
||||
private MetricsSource metrics;
|
||||
|
||||
/**
|
||||
* Create an entry stream over the given queue
|
||||
* @param logQueue the queue of WAL paths
|
||||
* @param fs {@link FileSystem} to use to create {@link Reader} for this stream
|
||||
* @param conf {@link Configuration} to use to create {@link Reader} for this stream
|
||||
* @param metrics replication metrics
|
||||
* @throws IOException
|
||||
*/
|
||||
public WALEntryStream(PriorityBlockingQueue<Path> logQueue, FileSystem fs, Configuration conf,
|
||||
MetricsSource metrics)
|
||||
throws IOException {
|
||||
this(logQueue, fs, conf, 0, metrics);
|
||||
}
|
||||
|
||||
/**
|
||||
* Create an entry stream over the given queue at the given start position
|
||||
* @param logQueue the queue of WAL paths
|
||||
* @param fs {@link FileSystem} to use to create {@link Reader} for this stream
|
||||
* @param conf {@link Configuration} to use to create {@link Reader} for this stream
|
||||
* @param startPosition the position in the first WAL to start reading at
|
||||
* @param metrics replication metrics
|
||||
* @throws IOException
|
||||
*/
|
||||
public WALEntryStream(PriorityBlockingQueue<Path> logQueue, FileSystem fs, Configuration conf,
|
||||
long startPosition, MetricsSource metrics) throws IOException {
|
||||
this.logQueue = logQueue;
|
||||
this.fs = fs;
|
||||
this.conf = conf;
|
||||
this.currentPosition = startPosition;
|
||||
this.metrics = metrics;
|
||||
}
|
||||
|
||||
/**
|
||||
* @return true if there is another WAL {@link Entry}
|
||||
* @throws WALEntryStreamRuntimeException if there was an Exception while reading
|
||||
*/
|
||||
@Override
|
||||
public boolean hasNext() {
|
||||
if (currentEntry == null) {
|
||||
try {
|
||||
tryAdvanceEntry();
|
||||
} catch (Exception e) {
|
||||
throw new WALEntryStreamRuntimeException(e);
|
||||
}
|
||||
}
|
||||
return currentEntry != null;
|
||||
}
|
||||
|
||||
/**
|
||||
* @return the next WAL entry in this stream
|
||||
* @throws WALEntryStreamRuntimeException if there was an IOException
|
||||
* @throws NoSuchElementException if no more entries in the stream.
|
||||
*/
|
||||
@Override
|
||||
public Entry next() {
|
||||
if (!hasNext()) throw new NoSuchElementException();
|
||||
Entry save = currentEntry;
|
||||
currentEntry = null; // gets reloaded by hasNext()
|
||||
return save;
|
||||
}
|
||||
|
||||
/**
|
||||
* Not supported.
|
||||
*/
|
||||
@Override
|
||||
public void remove() {
|
||||
throw new UnsupportedOperationException();
|
||||
}
|
||||
|
||||
/**
|
||||
* {@inheritDoc}
|
||||
*/
|
||||
@Override
|
||||
public void close() throws IOException {
|
||||
closeReader();
|
||||
}
|
||||
|
||||
/**
|
||||
* @return the iterator over WAL entries in the queue.
|
||||
*/
|
||||
@Override
|
||||
public Iterator<Entry> iterator() {
|
||||
return this;
|
||||
}
|
||||
|
||||
/**
|
||||
* @return the position of the last Entry returned by next()
|
||||
*/
|
||||
public long getPosition() {
|
||||
return currentPosition;
|
||||
}
|
||||
|
||||
/**
|
||||
* @return the {@link Path} of the current WAL
|
||||
*/
|
||||
public Path getCurrentPath() {
|
||||
return currentPath;
|
||||
}
|
||||
|
||||
private String getCurrentPathStat() {
|
||||
StringBuilder sb = new StringBuilder();
|
||||
if (currentPath != null) {
|
||||
sb.append("currently replicating from: ").append(currentPath).append(" at position: ")
|
||||
.append(currentPosition).append("\n");
|
||||
} else {
|
||||
sb.append("no replication ongoing, waiting for new log");
|
||||
}
|
||||
return sb.toString();
|
||||
}
|
||||
|
||||
/**
|
||||
* Should be called if the stream is to be reused (i.e. used again after hasNext() has returned
|
||||
* false)
|
||||
* @throws IOException
|
||||
*/
|
||||
public void reset() throws IOException {
|
||||
if (reader != null && currentPath != null) {
|
||||
resetReader();
|
||||
}
|
||||
}
|
||||
|
||||
private void setPosition(long position) {
|
||||
currentPosition = position;
|
||||
}
|
||||
|
||||
private void setCurrentPath(Path path) {
|
||||
this.currentPath = path;
|
||||
}
|
||||
|
||||
private void tryAdvanceEntry() throws IOException {
|
||||
if (checkReader()) {
|
||||
readNextEntryAndSetPosition();
|
||||
if (currentEntry == null) { // no more entries in this log file - see if log was rolled
|
||||
if (logQueue.size() > 1) { // log was rolled
|
||||
// Before dequeueing, we should always get one more attempt at reading.
|
||||
// This is in case more entries came in after we opened the reader,
|
||||
// and a new log was enqueued while we were reading. See HBASE-6758
|
||||
resetReader();
|
||||
readNextEntryAndSetPosition();
|
||||
if (currentEntry == null) {
|
||||
if (checkAllBytesParsed()) { // now we're certain we're done with this log file
|
||||
dequeueCurrentLog();
|
||||
if (openNextLog()) {
|
||||
readNextEntryAndSetPosition();
|
||||
}
|
||||
}
|
||||
}
|
||||
} // no other logs, we've simply hit the end of the current open log. Do nothing
|
||||
}
|
||||
}
|
||||
// do nothing if we don't have a WAL Reader (e.g. if there's no logs in queue)
|
||||
}
|
||||
|
||||
// HBASE-15984 check to see we have in fact parsed all data in a cleanly closed file
|
||||
private boolean checkAllBytesParsed() throws IOException {
|
||||
// -1 means the wal wasn't closed cleanly.
|
||||
final long trailerSize = currentTrailerSize();
|
||||
FileStatus stat = null;
|
||||
try {
|
||||
stat = fs.getFileStatus(this.currentPath);
|
||||
} catch (IOException exception) {
|
||||
LOG.warn("Couldn't get file length information about log " + this.currentPath + ", it "
|
||||
+ (trailerSize < 0 ? "was not" : "was") + " closed cleanly " + getCurrentPathStat());
|
||||
metrics.incrUnknownFileLengthForClosedWAL();
|
||||
}
|
||||
if (stat != null) {
|
||||
if (trailerSize < 0) {
|
||||
if (currentPosition < stat.getLen()) {
|
||||
final long skippedBytes = stat.getLen() - currentPosition;
|
||||
LOG.info("Reached the end of WAL file '" + currentPath
|
||||
+ "'. It was not closed cleanly, so we did not parse " + skippedBytes
|
||||
+ " bytes of data.");
|
||||
metrics.incrUncleanlyClosedWALs();
|
||||
metrics.incrBytesSkippedInUncleanlyClosedWALs(skippedBytes);
|
||||
}
|
||||
} else if (currentPosition + trailerSize < stat.getLen()) {
|
||||
LOG.warn("Processing end of WAL file '" + currentPath + "'. At position " + currentPosition
|
||||
+ ", which is too far away from reported file length " + stat.getLen()
|
||||
+ ". Restarting WAL reading (see HBASE-15983 for details). " + getCurrentPathStat());
|
||||
setPosition(0);
|
||||
resetReader();
|
||||
metrics.incrRestartedWALReading();
|
||||
metrics.incrRepeatedFileBytes(currentPosition);
|
||||
return false;
|
||||
}
|
||||
}
|
||||
if (LOG.isTraceEnabled()) {
|
||||
LOG.trace("Reached the end of log " + this.currentPath + ", and the length of the file is "
|
||||
+ (stat == null ? "N/A" : stat.getLen()));
|
||||
}
|
||||
metrics.incrCompletedWAL();
|
||||
return true;
|
||||
}
|
||||
|
||||
private void dequeueCurrentLog() throws IOException {
|
||||
if (LOG.isDebugEnabled()) {
|
||||
LOG.debug("Reached the end of log " + currentPath);
|
||||
}
|
||||
closeReader();
|
||||
logQueue.remove();
|
||||
setPosition(0);
|
||||
metrics.decrSizeOfLogQueue();
|
||||
}
|
||||
|
||||
private void readNextEntryAndSetPosition() throws IOException {
|
||||
Entry readEntry = reader.next();
|
||||
long readerPos = reader.getPosition();
|
||||
if (readEntry != null) {
|
||||
metrics.incrLogEditsRead();
|
||||
metrics.incrLogReadInBytes(readerPos - currentPosition);
|
||||
}
|
||||
currentEntry = readEntry; // could be null
|
||||
setPosition(readerPos);
|
||||
}
|
||||
|
||||
private void closeReader() throws IOException {
|
||||
if (reader != null) {
|
||||
reader.close();
|
||||
reader = null;
|
||||
}
|
||||
}
|
||||
|
||||
// if we don't have a reader, open a reader on the next log
|
||||
private boolean checkReader() throws IOException {
|
||||
if (reader == null) {
|
||||
return openNextLog();
|
||||
}
|
||||
return true;
|
||||
}
|
||||
|
||||
// open a reader on the next log in queue
|
||||
private boolean openNextLog() throws IOException {
|
||||
Path nextPath = logQueue.peek();
|
||||
if (nextPath != null) {
|
||||
openReader(nextPath);
|
||||
if (reader != null) return true;
|
||||
}
|
||||
return false;
|
||||
}
|
||||
|
||||
private Path getArchivedLog(Path path) throws IOException {
|
||||
Path rootDir = FSUtils.getRootDir(conf);
|
||||
Path oldLogDir = new Path(rootDir, HConstants.HREGION_OLDLOGDIR_NAME);
|
||||
Path archivedLogLocation = new Path(oldLogDir, path.getName());
|
||||
if (fs.exists(archivedLogLocation)) {
|
||||
LOG.info("Log " + path + " was moved to " + archivedLogLocation);
|
||||
return archivedLogLocation;
|
||||
} else {
|
||||
LOG.error("Couldn't locate log: " + path);
|
||||
return path;
|
||||
}
|
||||
}
|
||||
|
||||
private void openReader(Path path) throws IOException {
|
||||
try {
|
||||
// Detect if this is a new file, if so get a new reader else
|
||||
// reset the current reader so that we see the new data
|
||||
if (reader == null || !getCurrentPath().equals(path)) {
|
||||
closeReader();
|
||||
reader = WALFactory.createReader(fs, path, conf);
|
||||
seek();
|
||||
setCurrentPath(path);
|
||||
} else {
|
||||
resetReader();
|
||||
}
|
||||
} catch (FileNotFoundException fnfe) {
|
||||
// If the log was archived, continue reading from there
|
||||
Path archivedLog = getArchivedLog(path);
|
||||
if (!path.equals(archivedLog)) {
|
||||
openReader(archivedLog);
|
||||
} else {
|
||||
throw fnfe;
|
||||
}
|
||||
} catch (LeaseNotRecoveredException lnre) {
|
||||
// HBASE-15019 the WAL was not closed due to some hiccup.
|
||||
LOG.warn("Try to recover the WAL lease " + currentPath, lnre);
|
||||
recoverLease(conf, currentPath);
|
||||
reader = null;
|
||||
} catch (NullPointerException npe) {
|
||||
// Workaround for race condition in HDFS-4380
|
||||
// which throws a NPE if we open a file before any data node has the most recent block
|
||||
// Just sleep and retry. Will require re-reading compressed WALs for compressionContext.
|
||||
LOG.warn("Got NPE opening reader, will retry.");
|
||||
reader = null;
|
||||
}
|
||||
}
|
||||
|
||||
// For HBASE-15019
|
||||
private void recoverLease(final Configuration conf, final Path path) {
|
||||
try {
|
||||
final FileSystem dfs = FSUtils.getCurrentFileSystem(conf);
|
||||
FSUtils fsUtils = FSUtils.getInstance(dfs, conf);
|
||||
fsUtils.recoverFileLease(dfs, path, conf, new CancelableProgressable() {
|
||||
@Override
|
||||
public boolean progress() {
|
||||
LOG.debug("recover WAL lease: " + path);
|
||||
return true;
|
||||
}
|
||||
});
|
||||
} catch (IOException e) {
|
||||
LOG.warn("unable to recover lease for WAL: " + path, e);
|
||||
}
|
||||
}
|
||||
|
||||
private void resetReader() throws IOException {
|
||||
try {
|
||||
reader.reset();
|
||||
seek();
|
||||
} catch (FileNotFoundException fnfe) {
|
||||
// If the log was archived, continue reading from there
|
||||
Path archivedLog = getArchivedLog(currentPath);
|
||||
if (!currentPath.equals(archivedLog)) {
|
||||
openReader(archivedLog);
|
||||
} else {
|
||||
throw fnfe;
|
||||
}
|
||||
} catch (NullPointerException npe) {
|
||||
throw new IOException("NPE resetting reader, likely HDFS-4380", npe);
|
||||
}
|
||||
}
|
||||
|
||||
private void seek() throws IOException {
|
||||
if (currentPosition != 0) {
|
||||
reader.seek(currentPosition);
|
||||
}
|
||||
}
|
||||
|
||||
private long currentTrailerSize() {
|
||||
long size = -1L;
|
||||
if (reader instanceof ProtobufLogReader) {
|
||||
final ProtobufLogReader pblr = (ProtobufLogReader) reader;
|
||||
size = pblr.trailerSize();
|
||||
}
|
||||
return size;
|
||||
}
|
||||
|
||||
@InterfaceAudience.Private
|
||||
public static class WALEntryStreamRuntimeException extends RuntimeException {
|
||||
private static final long serialVersionUID = -6298201811259982568L;
|
||||
|
||||
public WALEntryStreamRuntimeException(Exception e) {
|
||||
super(e);
|
||||
}
|
||||
}
|
||||
|
||||
}
|
|
@ -1,228 +0,0 @@
|
|||
/**
|
||||
* Licensed to the Apache Software Foundation (ASF) under one
|
||||
* or more contributor license agreements. See the NOTICE file
|
||||
* distributed with this work for additional information
|
||||
* regarding copyright ownership. The ASF licenses this file
|
||||
* to you under the Apache License, Version 2.0 (the
|
||||
* "License"); you may not use this file except in compliance
|
||||
* with the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
package org.apache.hadoop.hbase.replication.regionserver;
|
||||
|
||||
import static org.junit.Assert.assertEquals;
|
||||
import static org.junit.Assert.assertNotEquals;
|
||||
import static org.junit.Assert.assertNotNull;
|
||||
import static org.junit.Assert.assertNull;
|
||||
import static org.junit.Assert.fail;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.util.ArrayList;
|
||||
import java.util.Collection;
|
||||
import java.util.List;
|
||||
import java.util.concurrent.atomic.AtomicLong;
|
||||
|
||||
import org.apache.hadoop.conf.Configuration;
|
||||
import org.apache.hadoop.fs.FileSystem;
|
||||
import org.apache.hadoop.fs.Path;
|
||||
import org.apache.hadoop.hbase.HBaseTestingUtility;
|
||||
import org.apache.hadoop.hbase.HConstants;
|
||||
import org.apache.hadoop.hbase.HRegionInfo;
|
||||
import org.apache.hadoop.hbase.HTableDescriptor;
|
||||
import org.apache.hadoop.hbase.KeyValue;
|
||||
import org.apache.hadoop.hbase.TableName;
|
||||
import org.apache.hadoop.hbase.regionserver.MultiVersionConcurrencyControl;
|
||||
import org.apache.hadoop.hbase.regionserver.wal.WALActionsListener;
|
||||
import org.apache.hadoop.hbase.regionserver.wal.WALEdit;
|
||||
import org.apache.hadoop.hbase.testclassification.LargeTests;
|
||||
import org.apache.hadoop.hbase.util.Bytes;
|
||||
import org.apache.hadoop.hbase.wal.WAL;
|
||||
import org.apache.hadoop.hbase.wal.WALFactory;
|
||||
import org.apache.hadoop.hbase.wal.WALKey;
|
||||
import org.apache.hadoop.hdfs.MiniDFSCluster;
|
||||
import org.junit.After;
|
||||
import org.junit.AfterClass;
|
||||
import org.junit.Before;
|
||||
import org.junit.BeforeClass;
|
||||
import org.junit.Rule;
|
||||
import org.junit.Test;
|
||||
import org.junit.experimental.categories.Category;
|
||||
import org.junit.rules.TestName;
|
||||
import org.junit.runner.RunWith;
|
||||
import org.junit.runners.Parameterized;
|
||||
import org.junit.runners.Parameterized.Parameters;
|
||||
@Category({LargeTests.class})
|
||||
@RunWith(Parameterized.class)
|
||||
public class TestReplicationWALReaderManager {
|
||||
|
||||
private static HBaseTestingUtility TEST_UTIL;
|
||||
private static Configuration conf;
|
||||
private static FileSystem fs;
|
||||
private static MiniDFSCluster cluster;
|
||||
private static final TableName tableName = TableName.valueOf("tablename");
|
||||
private static final byte [] family = Bytes.toBytes("column");
|
||||
private static final byte [] qualifier = Bytes.toBytes("qualifier");
|
||||
private static final HRegionInfo info = new HRegionInfo(tableName,
|
||||
HConstants.EMPTY_START_ROW, HConstants.LAST_ROW, false);
|
||||
private static final HTableDescriptor htd = new HTableDescriptor(tableName);
|
||||
|
||||
private WAL log;
|
||||
private ReplicationWALReaderManager logManager;
|
||||
private PathWatcher pathWatcher;
|
||||
private int nbRows;
|
||||
private int walEditKVs;
|
||||
@Rule public TestName tn = new TestName();
|
||||
private final MultiVersionConcurrencyControl mvcc = new MultiVersionConcurrencyControl();
|
||||
|
||||
@Parameters
|
||||
public static Collection<Object[]> parameters() {
|
||||
// Try out different combinations of row count and KeyValue count
|
||||
int[] NB_ROWS = { 1500, 60000 };
|
||||
int[] NB_KVS = { 1, 100 };
|
||||
// whether compression is used
|
||||
Boolean[] BOOL_VALS = { false, true };
|
||||
List<Object[]> parameters = new ArrayList<Object[]>();
|
||||
for (int nbRows : NB_ROWS) {
|
||||
for (int walEditKVs : NB_KVS) {
|
||||
for (boolean b : BOOL_VALS) {
|
||||
Object[] arr = new Object[3];
|
||||
arr[0] = nbRows;
|
||||
arr[1] = walEditKVs;
|
||||
arr[2] = b;
|
||||
parameters.add(arr);
|
||||
}
|
||||
}
|
||||
}
|
||||
return parameters;
|
||||
}
|
||||
|
||||
public TestReplicationWALReaderManager(int nbRows, int walEditKVs, boolean enableCompression) {
|
||||
this.nbRows = nbRows;
|
||||
this.walEditKVs = walEditKVs;
|
||||
TEST_UTIL.getConfiguration().setBoolean(HConstants.ENABLE_WAL_COMPRESSION,
|
||||
enableCompression);
|
||||
mvcc.advanceTo(1);
|
||||
}
|
||||
|
||||
@BeforeClass
|
||||
public static void setUpBeforeClass() throws Exception {
|
||||
TEST_UTIL = new HBaseTestingUtility();
|
||||
conf = TEST_UTIL.getConfiguration();
|
||||
TEST_UTIL.startMiniDFSCluster(3);
|
||||
|
||||
cluster = TEST_UTIL.getDFSCluster();
|
||||
fs = cluster.getFileSystem();
|
||||
}
|
||||
|
||||
@AfterClass
|
||||
public static void tearDownAfterClass() throws Exception {
|
||||
TEST_UTIL.shutdownMiniCluster();
|
||||
}
|
||||
|
||||
@Before
|
||||
public void setUp() throws Exception {
|
||||
logManager = new ReplicationWALReaderManager(fs, conf);
|
||||
List<WALActionsListener> listeners = new ArrayList<WALActionsListener>();
|
||||
pathWatcher = new PathWatcher();
|
||||
listeners.add(pathWatcher);
|
||||
final WALFactory wals = new WALFactory(conf, listeners, tn.getMethodName());
|
||||
log = wals.getWAL(info.getEncodedNameAsBytes(), info.getTable().getNamespace());
|
||||
}
|
||||
|
||||
@After
|
||||
public void tearDown() throws Exception {
|
||||
log.close();
|
||||
}
|
||||
|
||||
@Test
|
||||
public void test() throws Exception {
|
||||
// Grab the path that was generated when the log rolled as part of its creation
|
||||
Path path = pathWatcher.currentPath;
|
||||
|
||||
assertEquals(0, logManager.getPosition());
|
||||
|
||||
appendToLog();
|
||||
|
||||
// There's one edit in the log, read it. Reading past it needs to return nulls
|
||||
assertNotNull(logManager.openReader(path));
|
||||
logManager.seek();
|
||||
WAL.Entry entry = logManager.readNextAndSetPosition();
|
||||
assertNotNull(entry);
|
||||
entry = logManager.readNextAndSetPosition();
|
||||
assertNull(entry);
|
||||
logManager.closeReader();
|
||||
long oldPos = logManager.getPosition();
|
||||
|
||||
appendToLog();
|
||||
|
||||
// Read the newly added entry, make sure we made progress
|
||||
assertNotNull(logManager.openReader(path));
|
||||
logManager.seek();
|
||||
entry = logManager.readNextAndSetPosition();
|
||||
assertNotEquals(oldPos, logManager.getPosition());
|
||||
assertNotNull(entry);
|
||||
logManager.closeReader();
|
||||
oldPos = logManager.getPosition();
|
||||
|
||||
log.rollWriter();
|
||||
|
||||
// We rolled but we still should see the end of the first log and not get data
|
||||
assertNotNull(logManager.openReader(path));
|
||||
logManager.seek();
|
||||
entry = logManager.readNextAndSetPosition();
|
||||
assertEquals(oldPos, logManager.getPosition());
|
||||
assertNull(entry);
|
||||
logManager.finishCurrentFile();
|
||||
|
||||
path = pathWatcher.currentPath;
|
||||
|
||||
for (int i = 0; i < nbRows; i++) { appendToLogPlus(walEditKVs); }
|
||||
log.rollWriter();
|
||||
logManager.openReader(path);
|
||||
logManager.seek();
|
||||
for (int i = 0; i < nbRows; i++) {
|
||||
WAL.Entry e = logManager.readNextAndSetPosition();
|
||||
if (e == null) {
|
||||
fail("Should have enough entries");
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
private void appendToLog() throws IOException {
|
||||
appendToLogPlus(1);
|
||||
}
|
||||
|
||||
private void appendToLogPlus(int count) throws IOException {
|
||||
final long txid = log.append(htd, info,
|
||||
new WALKey(info.getEncodedNameAsBytes(), tableName, System.currentTimeMillis(), mvcc),
|
||||
getWALEdits(count), true);
|
||||
log.sync(txid);
|
||||
}
|
||||
|
||||
private WALEdit getWALEdits(int count) {
|
||||
WALEdit edit = new WALEdit();
|
||||
for (int i = 0; i < count; i++) {
|
||||
edit.add(new KeyValue(Bytes.toBytes(System.currentTimeMillis()), family, qualifier,
|
||||
System.currentTimeMillis(), qualifier));
|
||||
}
|
||||
return edit;
|
||||
}
|
||||
|
||||
class PathWatcher extends WALActionsListener.Base {
|
||||
|
||||
Path currentPath;
|
||||
|
||||
@Override
|
||||
public void preLogRoll(Path oldPath, Path newPath) throws IOException {
|
||||
currentPath = newPath;
|
||||
}
|
||||
}
|
||||
}
|
|
@ -0,0 +1,440 @@
|
|||
/*
|
||||
*
|
||||
* Licensed to the Apache Software Foundation (ASF) under one
|
||||
* or more contributor license agreements. See the NOTICE file
|
||||
* distributed with this work for additional information
|
||||
* regarding copyright ownership. The ASF licenses this file
|
||||
* to you under the Apache License, Version 2.0 (the
|
||||
* "License"); you may not use this file except in compliance
|
||||
* with the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
package org.apache.hadoop.hbase.replication.regionserver;
|
||||
|
||||
import static org.junit.Assert.assertEquals;
|
||||
import static org.junit.Assert.assertFalse;
|
||||
import static org.junit.Assert.assertNotEquals;
|
||||
import static org.junit.Assert.assertNotNull;
|
||||
import static org.junit.Assert.assertTrue;
|
||||
import static org.junit.Assert.fail;
|
||||
import static org.mockito.Mockito.when;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.util.ArrayList;
|
||||
import java.util.List;
|
||||
import java.util.NavigableMap;
|
||||
import java.util.NoSuchElementException;
|
||||
import java.util.TreeMap;
|
||||
import java.util.concurrent.PriorityBlockingQueue;
|
||||
import java.util.concurrent.atomic.AtomicLong;
|
||||
|
||||
import org.apache.hadoop.conf.Configuration;
|
||||
import org.apache.hadoop.fs.FileSystem;
|
||||
import org.apache.hadoop.fs.Path;
|
||||
import org.apache.hadoop.hbase.Cell;
|
||||
import org.apache.hadoop.hbase.HBaseTestingUtility;
|
||||
import org.apache.hadoop.hbase.HConstants;
|
||||
import org.apache.hadoop.hbase.HRegionInfo;
|
||||
import org.apache.hadoop.hbase.HTableDescriptor;
|
||||
import org.apache.hadoop.hbase.KeyValue;
|
||||
import org.apache.hadoop.hbase.TableName;
|
||||
import org.apache.hadoop.hbase.regionserver.MultiVersionConcurrencyControl;
|
||||
import org.apache.hadoop.hbase.regionserver.wal.WALActionsListener;
|
||||
import org.apache.hadoop.hbase.regionserver.wal.WALEdit;
|
||||
import org.apache.hadoop.hbase.replication.ReplicationQueueInfo;
|
||||
import org.apache.hadoop.hbase.replication.WALEntryFilter;
|
||||
import org.apache.hadoop.hbase.replication.regionserver.ReplicationSourceWALReaderThread.WALEntryBatch;
|
||||
import org.apache.hadoop.hbase.testclassification.LargeTests;
|
||||
import org.apache.hadoop.hbase.testclassification.ReplicationTests;
|
||||
import org.apache.hadoop.hbase.util.Bytes;
|
||||
import org.apache.hadoop.hbase.wal.WAL;
|
||||
import org.apache.hadoop.hbase.wal.WAL.Entry;
|
||||
import org.apache.hadoop.hbase.wal.WALFactory;
|
||||
import org.apache.hadoop.hbase.wal.WALKey;
|
||||
import org.apache.hadoop.hdfs.MiniDFSCluster;
|
||||
import org.junit.After;
|
||||
import org.junit.AfterClass;
|
||||
import org.junit.Before;
|
||||
import org.junit.BeforeClass;
|
||||
import org.junit.Rule;
|
||||
import org.junit.Test;
|
||||
import org.junit.experimental.categories.Category;
|
||||
import org.junit.rules.TestName;
|
||||
import org.junit.runner.RunWith;
|
||||
import org.mockito.Mockito;
|
||||
import org.mockito.runners.MockitoJUnitRunner;
|
||||
|
||||
@RunWith(MockitoJUnitRunner.class)
|
||||
@Category({ ReplicationTests.class, LargeTests.class })
|
||||
public class TestWALEntryStream {
|
||||
|
||||
private static HBaseTestingUtility TEST_UTIL;
|
||||
private static Configuration conf;
|
||||
private static FileSystem fs;
|
||||
private static MiniDFSCluster cluster;
|
||||
private static final TableName tableName = TableName.valueOf("tablename");
|
||||
private static final byte[] family = Bytes.toBytes("column");
|
||||
private static final byte[] qualifier = Bytes.toBytes("qualifier");
|
||||
private static final HRegionInfo info =
|
||||
new HRegionInfo(tableName, HConstants.EMPTY_START_ROW, HConstants.LAST_ROW, false);
|
||||
private static final HTableDescriptor htd = new HTableDescriptor(tableName);
|
||||
private static NavigableMap<byte[], Integer> scopes;
|
||||
|
||||
private WAL log;
|
||||
PriorityBlockingQueue<Path> walQueue;
|
||||
private PathWatcher pathWatcher;
|
||||
|
||||
@Rule
|
||||
public TestName tn = new TestName();
|
||||
private final MultiVersionConcurrencyControl mvcc = new MultiVersionConcurrencyControl();
|
||||
|
||||
@BeforeClass
|
||||
public static void setUpBeforeClass() throws Exception {
|
||||
TEST_UTIL = new HBaseTestingUtility();
|
||||
conf = TEST_UTIL.getConfiguration();
|
||||
TEST_UTIL.startMiniDFSCluster(3);
|
||||
|
||||
cluster = TEST_UTIL.getDFSCluster();
|
||||
fs = cluster.getFileSystem();
|
||||
scopes = new TreeMap<byte[], Integer>(Bytes.BYTES_COMPARATOR);
|
||||
for (byte[] fam : htd.getFamiliesKeys()) {
|
||||
scopes.put(fam, 0);
|
||||
}
|
||||
}
|
||||
|
||||
@AfterClass
|
||||
public static void tearDownAfterClass() throws Exception {
|
||||
TEST_UTIL.shutdownMiniCluster();
|
||||
}
|
||||
|
||||
@Before
|
||||
public void setUp() throws Exception {
|
||||
walQueue = new PriorityBlockingQueue<>();
|
||||
List<WALActionsListener> listeners = new ArrayList<WALActionsListener>();
|
||||
pathWatcher = new PathWatcher();
|
||||
listeners.add(pathWatcher);
|
||||
final WALFactory wals = new WALFactory(conf, listeners, tn.getMethodName());
|
||||
log = wals.getWAL(info.getEncodedNameAsBytes(), info.getTable().getNamespace());
|
||||
}
|
||||
|
||||
@After
|
||||
public void tearDown() throws Exception {
|
||||
log.close();
|
||||
}
|
||||
|
||||
// Try out different combinations of row count and KeyValue count
|
||||
@Test
|
||||
public void testDifferentCounts() throws Exception {
|
||||
int[] NB_ROWS = { 1500, 60000 };
|
||||
int[] NB_KVS = { 1, 100 };
|
||||
// whether compression is used
|
||||
Boolean[] BOOL_VALS = { false, true };
|
||||
// long lastPosition = 0;
|
||||
for (int nbRows : NB_ROWS) {
|
||||
for (int walEditKVs : NB_KVS) {
|
||||
for (boolean isCompressionEnabled : BOOL_VALS) {
|
||||
TEST_UTIL.getConfiguration().setBoolean(HConstants.ENABLE_WAL_COMPRESSION,
|
||||
isCompressionEnabled);
|
||||
mvcc.advanceTo(1);
|
||||
|
||||
for (int i = 0; i < nbRows; i++) {
|
||||
appendToLogPlus(walEditKVs);
|
||||
}
|
||||
|
||||
log.rollWriter();
|
||||
|
||||
try (WALEntryStream entryStream =
|
||||
new WALEntryStream(walQueue, fs, conf, new MetricsSource("1"))) {
|
||||
int i = 0;
|
||||
for (WAL.Entry e : entryStream) {
|
||||
assertNotNull(e);
|
||||
i++;
|
||||
}
|
||||
assertEquals(nbRows, i);
|
||||
|
||||
// should've read all entries
|
||||
assertFalse(entryStream.hasNext());
|
||||
}
|
||||
// reset everything for next loop
|
||||
log.close();
|
||||
setUp();
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Tests basic reading of log appends
|
||||
*/
|
||||
@Test
|
||||
public void testAppendsWithRolls() throws Exception {
|
||||
appendToLog();
|
||||
|
||||
long oldPos;
|
||||
try (WALEntryStream entryStream =
|
||||
new WALEntryStream(walQueue, fs, conf, new MetricsSource("1"))) {
|
||||
// There's one edit in the log, read it. Reading past it needs to throw exception
|
||||
assertTrue(entryStream.hasNext());
|
||||
WAL.Entry entry = entryStream.next();
|
||||
assertNotNull(entry);
|
||||
assertFalse(entryStream.hasNext());
|
||||
try {
|
||||
entry = entryStream.next();
|
||||
fail();
|
||||
} catch (NoSuchElementException e) {
|
||||
// expected
|
||||
}
|
||||
oldPos = entryStream.getPosition();
|
||||
}
|
||||
|
||||
appendToLog();
|
||||
|
||||
try (WALEntryStream entryStream =
|
||||
new WALEntryStream(walQueue, fs, conf, oldPos, new MetricsSource("1"))) {
|
||||
// Read the newly added entry, make sure we made progress
|
||||
WAL.Entry entry = entryStream.next();
|
||||
assertNotEquals(oldPos, entryStream.getPosition());
|
||||
assertNotNull(entry);
|
||||
oldPos = entryStream.getPosition();
|
||||
}
|
||||
|
||||
// We rolled but we still should see the end of the first log and get that item
|
||||
appendToLog();
|
||||
log.rollWriter();
|
||||
appendToLog();
|
||||
|
||||
try (WALEntryStream entryStream =
|
||||
new WALEntryStream(walQueue, fs, conf, oldPos, new MetricsSource("1"))) {
|
||||
WAL.Entry entry = entryStream.next();
|
||||
assertNotEquals(oldPos, entryStream.getPosition());
|
||||
assertNotNull(entry);
|
||||
|
||||
// next item should come from the new log
|
||||
entry = entryStream.next();
|
||||
assertNotEquals(oldPos, entryStream.getPosition());
|
||||
assertNotNull(entry);
|
||||
|
||||
// no more entries to read
|
||||
assertFalse(entryStream.hasNext());
|
||||
oldPos = entryStream.getPosition();
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Tests that if after a stream is opened, more entries come in and then the log is rolled, we
|
||||
* don't mistakenly dequeue the current log thinking we're done with it
|
||||
*/
|
||||
@Test
|
||||
public void testLogrollWhileStreaming() throws Exception {
|
||||
appendToLog("1");
|
||||
appendToLog("2");// 2
|
||||
try (WALEntryStream entryStream =
|
||||
new WALEntryStream(walQueue, fs, conf, new MetricsSource("1"))) {
|
||||
assertEquals("1", getRow(entryStream.next()));
|
||||
|
||||
appendToLog("3"); // 3 - comes in after reader opened
|
||||
log.rollWriter(); // log roll happening while we're reading
|
||||
appendToLog("4"); // 4 - this append is in the rolled log
|
||||
|
||||
assertEquals("2", getRow(entryStream.next()));
|
||||
assertEquals(2, walQueue.size()); // we should not have dequeued yet since there's still an
|
||||
// entry in first log
|
||||
assertEquals("3", getRow(entryStream.next())); // if implemented improperly, this would be 4
|
||||
// and 3 would be skipped
|
||||
assertEquals("4", getRow(entryStream.next())); // 4
|
||||
assertEquals(1, walQueue.size()); // now we've dequeued and moved on to next log properly
|
||||
assertFalse(entryStream.hasNext());
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Tests that if writes come in while we have a stream open, we shouldn't miss them
|
||||
*/
|
||||
@Test
|
||||
public void testNewEntriesWhileStreaming() throws Exception {
|
||||
appendToLog("1");
|
||||
try (WALEntryStream entryStream =
|
||||
new WALEntryStream(walQueue, fs, conf, 0, new MetricsSource("1"))) {
|
||||
entryStream.next(); // we've hit the end of the stream at this point
|
||||
|
||||
// some new entries come in while we're streaming
|
||||
appendToLog("2");
|
||||
appendToLog("3");
|
||||
|
||||
// don't see them
|
||||
assertFalse(entryStream.hasNext());
|
||||
|
||||
// But we do if we reset
|
||||
entryStream.reset();
|
||||
assertEquals("2", getRow(entryStream.next()));
|
||||
assertEquals("3", getRow(entryStream.next()));
|
||||
assertFalse(entryStream.hasNext());
|
||||
}
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testResumeStreamingFromPosition() throws Exception {
|
||||
long lastPosition = 0;
|
||||
appendToLog("1");
|
||||
try (WALEntryStream entryStream =
|
||||
new WALEntryStream(walQueue, fs, conf, 0, new MetricsSource("1"))) {
|
||||
entryStream.next(); // we've hit the end of the stream at this point
|
||||
appendToLog("2");
|
||||
appendToLog("3");
|
||||
lastPosition = entryStream.getPosition();
|
||||
}
|
||||
// next stream should picks up where we left off
|
||||
try (WALEntryStream entryStream =
|
||||
new WALEntryStream(walQueue, fs, conf, lastPosition, new MetricsSource("1"))) {
|
||||
assertEquals("2", getRow(entryStream.next()));
|
||||
assertEquals("3", getRow(entryStream.next()));
|
||||
assertFalse(entryStream.hasNext()); // done
|
||||
assertEquals(1, walQueue.size());
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Tests that if we stop before hitting the end of a stream, we can continue where we left off
|
||||
* using the last position
|
||||
*/
|
||||
@Test
|
||||
public void testPosition() throws Exception {
|
||||
long lastPosition = 0;
|
||||
appendEntriesToLog(3);
|
||||
// read only one element
|
||||
try (WALEntryStream entryStream =
|
||||
new WALEntryStream(walQueue, fs, conf, lastPosition, new MetricsSource("1"))) {
|
||||
entryStream.next();
|
||||
lastPosition = entryStream.getPosition();
|
||||
}
|
||||
// there should still be two more entries from where we left off
|
||||
try (WALEntryStream entryStream =
|
||||
new WALEntryStream(walQueue, fs, conf, lastPosition, new MetricsSource("1"))) {
|
||||
assertNotNull(entryStream.next());
|
||||
assertNotNull(entryStream.next());
|
||||
assertFalse(entryStream.hasNext());
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
@Test
|
||||
public void testEmptyStream() throws Exception {
|
||||
try (WALEntryStream entryStream =
|
||||
new WALEntryStream(walQueue, fs, conf, 0, new MetricsSource("1"))) {
|
||||
assertFalse(entryStream.hasNext());
|
||||
}
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testReplicationSourceWALReaderThread() throws Exception {
|
||||
appendEntriesToLog(3);
|
||||
// get ending position
|
||||
long position;
|
||||
try (WALEntryStream entryStream =
|
||||
new WALEntryStream(walQueue, fs, conf, new MetricsSource("1"))) {
|
||||
entryStream.next();
|
||||
entryStream.next();
|
||||
entryStream.next();
|
||||
position = entryStream.getPosition();
|
||||
}
|
||||
|
||||
// start up a batcher
|
||||
ReplicationSourceManager mockSourceManager = Mockito.mock(ReplicationSourceManager.class);
|
||||
when(mockSourceManager.getTotalBufferUsed()).thenReturn(new AtomicLong(0));
|
||||
ReplicationSourceWALReaderThread batcher = new ReplicationSourceWALReaderThread(mockSourceManager, getQueueInfo(),walQueue, 0,
|
||||
fs, conf, getDummyFilter(), new MetricsSource("1"));
|
||||
Path walPath = walQueue.peek();
|
||||
batcher.start();
|
||||
WALEntryBatch entryBatch = batcher.take();
|
||||
|
||||
// should've batched up our entries
|
||||
assertNotNull(entryBatch);
|
||||
assertEquals(3, entryBatch.getWalEntries().size());
|
||||
assertEquals(position, entryBatch.getLastWalPosition());
|
||||
assertEquals(walPath, entryBatch.getLastWalPath());
|
||||
assertEquals(3, entryBatch.getNbRowKeys());
|
||||
|
||||
appendToLog("foo");
|
||||
entryBatch = batcher.take();
|
||||
assertEquals(1, entryBatch.getNbEntries());
|
||||
assertEquals(getRow(entryBatch.getWalEntries().get(0)), "foo");
|
||||
}
|
||||
|
||||
private String getRow(WAL.Entry entry) {
|
||||
Cell cell = entry.getEdit().getCells().get(0);
|
||||
return Bytes.toString(cell.getRowArray(), cell.getRowOffset(), cell.getRowLength());
|
||||
}
|
||||
|
||||
private void appendToLog(String key) throws IOException {
|
||||
final long txid = log.append(htd, info,
|
||||
new WALKey(info.getEncodedNameAsBytes(), tableName, System.currentTimeMillis(), mvcc),
|
||||
getWALEdit(key), true);
|
||||
log.sync(txid);
|
||||
}
|
||||
|
||||
private void appendEntriesToLog(int count) throws IOException {
|
||||
for (int i = 0; i < count; i++) {
|
||||
appendToLog();
|
||||
}
|
||||
}
|
||||
|
||||
private void appendToLog() throws IOException {
|
||||
appendToLogPlus(1);
|
||||
}
|
||||
|
||||
private void appendToLogPlus(int count) throws IOException {
|
||||
final long txid = log.append(htd, info,
|
||||
new WALKey(info.getEncodedNameAsBytes(), tableName, System.currentTimeMillis(), mvcc),
|
||||
getWALEdits(count), true);
|
||||
log.sync(txid);
|
||||
}
|
||||
|
||||
private WALEdit getWALEdits(int count) {
|
||||
WALEdit edit = new WALEdit();
|
||||
for (int i = 0; i < count; i++) {
|
||||
edit.add(new KeyValue(Bytes.toBytes(System.currentTimeMillis()), family, qualifier,
|
||||
System.currentTimeMillis(), qualifier));
|
||||
}
|
||||
return edit;
|
||||
}
|
||||
|
||||
private WALEdit getWALEdit(String row) {
|
||||
WALEdit edit = new WALEdit();
|
||||
edit.add(
|
||||
new KeyValue(Bytes.toBytes(row), family, qualifier, System.currentTimeMillis(), qualifier));
|
||||
return edit;
|
||||
}
|
||||
|
||||
private WALEntryFilter getDummyFilter() {
|
||||
return new WALEntryFilter() {
|
||||
|
||||
@Override
|
||||
public Entry filter(Entry entry) {
|
||||
return entry;
|
||||
}
|
||||
};
|
||||
}
|
||||
|
||||
private ReplicationQueueInfo getQueueInfo() {
|
||||
return new ReplicationQueueInfo("1");
|
||||
}
|
||||
|
||||
class PathWatcher extends WALActionsListener.Base {
|
||||
|
||||
Path currentPath;
|
||||
|
||||
@Override
|
||||
public void preLogRoll(Path oldPath, Path newPath) throws IOException {
|
||||
walQueue.add(newPath);
|
||||
currentPath = newPath;
|
||||
}
|
||||
}
|
||||
|
||||
}
|
Loading…
Reference in New Issue