HBASE-9865 Reused WALEdits in replication may cause RegionServers to go OOM

git-svn-id: https://svn.apache.org/repos/asf/hbase/trunk@1541265 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
larsh 2013-11-12 22:13:01 +00:00
parent 7f28831af2
commit 60ad668311
5 changed files with 35 additions and 48 deletions

View File

@ -85,7 +85,7 @@ public class WALEdit implements Writable, HeapSize {
private final int VERSION_2 = -1; private final int VERSION_2 = -1;
private final boolean isReplay; private final boolean isReplay;
private final ArrayList<KeyValue> kvs = new ArrayList<KeyValue>(); private final ArrayList<KeyValue> kvs = new ArrayList<KeyValue>(1);
// Only here for legacy writable deserialization // Only here for legacy writable deserialization
@Deprecated @Deprecated
@ -134,7 +134,7 @@ public class WALEdit implements Writable, HeapSize {
return kvs.size(); return kvs.size();
} }
public List<KeyValue> getKeyValues() { public ArrayList<KeyValue> getKeyValues() {
return kvs; return kvs;
} }
@ -210,6 +210,7 @@ public class WALEdit implements Writable, HeapSize {
*/ */
public int readFromCells(Codec.Decoder cellDecoder, int expectedCount) throws IOException { public int readFromCells(Codec.Decoder cellDecoder, int expectedCount) throws IOException {
kvs.clear(); kvs.clear();
kvs.ensureCapacity(expectedCount);
while (kvs.size() < expectedCount && cellDecoder.advance()) { while (kvs.size() < expectedCount && cellDecoder.advance()) {
Cell cell = cellDecoder.current(); Cell cell = cellDecoder.current();
if (!(cell instanceof KeyValue)) { if (!(cell instanceof KeyValue)) {
@ -221,7 +222,7 @@ public class WALEdit implements Writable, HeapSize {
} }
public long heapSize() { public long heapSize() {
long ret = 0; long ret = ClassSize.ARRAYLIST;
for (KeyValue kv : kvs) { for (KeyValue kv : kvs) {
ret += kv.heapSize(); ret += kv.heapSize();
} }

View File

@ -79,14 +79,11 @@ public class ReplicationHLogReaderManager {
/** /**
* Get the next entry, returned and also added in the array * Get the next entry, returned and also added in the array
* @param entriesArray
* @param currentNbEntries
* @return a new entry or null * @return a new entry or null
* @throws IOException * @throws IOException
*/ */
public HLog.Entry readNextAndSetPosition(HLog.Entry[] entriesArray, public HLog.Entry readNextAndSetPosition() throws IOException {
int currentNbEntries) throws IOException { HLog.Entry entry = this.reader.next();
HLog.Entry entry = this.reader.next(entriesArray[currentNbEntries]);
// Store the position so that in the future the reader can start // Store the position so that in the future the reader can start
// reading from here. If the above call to next() throws an // reading from here. If the above call to next() throws an
// exception, the position won't be changed and retry will happen // exception, the position won't be changed and retry will happen

View File

@ -23,7 +23,7 @@ import java.io.FileNotFoundException;
import java.io.IOException; import java.io.IOException;
import java.net.ConnectException; import java.net.ConnectException;
import java.net.SocketTimeoutException; import java.net.SocketTimeoutException;
import java.util.Arrays; import java.util.ArrayList;
import java.util.Comparator; import java.util.Comparator;
import java.util.List; import java.util.List;
import java.util.NavigableMap; import java.util.NavigableMap;
@ -76,8 +76,6 @@ public class ReplicationSource extends Thread
public static final Log LOG = LogFactory.getLog(ReplicationSource.class); public static final Log LOG = LogFactory.getLog(ReplicationSource.class);
// Queue of logs to process // Queue of logs to process
private PriorityBlockingQueue<Path> queue; private PriorityBlockingQueue<Path> queue;
// container of entries to replicate
private HLog.Entry[] entriesArray;
private HConnection conn; private HConnection conn;
private ReplicationQueues replicationQueues; private ReplicationQueues replicationQueues;
private ReplicationPeers replicationPeers; private ReplicationPeers replicationPeers;
@ -116,8 +114,6 @@ public class ReplicationSource extends Thread
private int maxRetriesMultiplier; private int maxRetriesMultiplier;
// Socket timeouts require even bolder actions since we don't want to DDOS // Socket timeouts require even bolder actions since we don't want to DDOS
private int socketTimeoutMultiplier; private int socketTimeoutMultiplier;
// Current number of entries that we need to replicate
private int currentNbEntries = 0;
// Current number of operations (Put/Delete) that we need to replicate // Current number of operations (Put/Delete) that we need to replicate
private int currentNbOperations = 0; private int currentNbOperations = 0;
// Current size of data we need to replicate // Current size of data we need to replicate
@ -153,10 +149,6 @@ public class ReplicationSource extends Thread
this.conf.getLong("replication.source.size.capacity", 1024*1024*64); this.conf.getLong("replication.source.size.capacity", 1024*1024*64);
this.replicationQueueNbCapacity = this.replicationQueueNbCapacity =
this.conf.getInt("replication.source.nb.capacity", 25000); this.conf.getInt("replication.source.nb.capacity", 25000);
this.entriesArray = new HLog.Entry[this.replicationQueueNbCapacity];
for (int i = 0; i < this.replicationQueueNbCapacity; i++) {
this.entriesArray[i] = new HLog.Entry();
}
this.maxRetriesMultiplier = this.conf.getInt("replication.source.maxretriesmultiplier", 10); this.maxRetriesMultiplier = this.conf.getInt("replication.source.maxretriesmultiplier", 10);
this.socketTimeoutMultiplier = this.conf.getInt("replication.source.socketTimeoutMultiplier", this.socketTimeoutMultiplier = this.conf.getInt("replication.source.socketTimeoutMultiplier",
maxRetriesMultiplier * maxRetriesMultiplier); maxRetriesMultiplier * maxRetriesMultiplier);
@ -289,10 +281,10 @@ public class ReplicationSource extends Thread
boolean gotIOE = false; boolean gotIOE = false;
currentNbOperations = 0; currentNbOperations = 0;
currentNbEntries = 0; List<HLog.Entry> entries = new ArrayList<HLog.Entry>(1);
currentSize = 0; currentSize = 0;
try { try {
if (readAllEntriesToReplicateOrNextFile(currentWALisBeingWrittenTo)) { if (readAllEntriesToReplicateOrNextFile(currentWALisBeingWrittenTo, entries)) {
continue; continue;
} }
} catch (IOException ioe) { } catch (IOException ioe) {
@ -311,11 +303,6 @@ public class ReplicationSource extends Thread
} catch (IOException e) { } catch (IOException e) {
LOG.warn(this.peerClusterZnode + " Got while getting file size: ", e); LOG.warn(this.peerClusterZnode + " Got while getting file size: ", e);
} }
} else if (currentNbEntries != 0) {
LOG.warn(this.peerClusterZnode +
" Got EOF while reading, " + "looks like this file is broken? " + currentPath);
considerDumping = true;
currentNbEntries = 0;
} }
if (considerDumping && if (considerDumping &&
@ -337,7 +324,7 @@ public class ReplicationSource extends Thread
// If we didn't get anything to replicate, or if we hit a IOE, // If we didn't get anything to replicate, or if we hit a IOE,
// wait a bit and retry. // wait a bit and retry.
// But if we need to stop, don't bother sleeping // But if we need to stop, don't bother sleeping
if (this.isActive() && (gotIOE || currentNbEntries == 0)) { if (this.isActive() && (gotIOE || entries.isEmpty())) {
if (this.lastLoggedPosition != this.repLogReader.getPosition()) { if (this.lastLoggedPosition != this.repLogReader.getPosition()) {
this.manager.logPositionAndCleanOldLogs(this.currentPath, this.manager.logPositionAndCleanOldLogs(this.currentPath,
this.peerClusterZnode, this.repLogReader.getPosition(), this.peerClusterZnode, this.repLogReader.getPosition(),
@ -354,8 +341,7 @@ public class ReplicationSource extends Thread
continue; continue;
} }
sleepMultiplier = 1; sleepMultiplier = 1;
shipEdits(currentWALisBeingWrittenTo); shipEdits(currentWALisBeingWrittenTo, entries);
} }
if (this.conn != null) { if (this.conn != null) {
try { try {
@ -372,11 +358,12 @@ public class ReplicationSource extends Thread
* Read all the entries from the current log files and retain those * Read all the entries from the current log files and retain those
* that need to be replicated. Else, process the end of the current file. * that need to be replicated. Else, process the end of the current file.
* @param currentWALisBeingWrittenTo is the current WAL being written to * @param currentWALisBeingWrittenTo is the current WAL being written to
* @param entries resulting entries to be replicated
* @return true if we got nothing and went to the next file, false if we got * @return true if we got nothing and went to the next file, false if we got
* entries * entries
* @throws IOException * @throws IOException
*/ */
protected boolean readAllEntriesToReplicateOrNextFile(boolean currentWALisBeingWrittenTo) protected boolean readAllEntriesToReplicateOrNextFile(boolean currentWALisBeingWrittenTo, List<HLog.Entry> entries)
throws IOException{ throws IOException{
long seenEntries = 0; long seenEntries = 0;
if (LOG.isTraceEnabled()) { if (LOG.isTraceEnabled()) {
@ -385,7 +372,7 @@ public class ReplicationSource extends Thread
} }
this.repLogReader.seek(); this.repLogReader.seek();
HLog.Entry entry = HLog.Entry entry =
this.repLogReader.readNextAndSetPosition(this.entriesArray, this.currentNbEntries); this.repLogReader.readNextAndSetPosition();
while (entry != null) { while (entry != null) {
WALEdit edit = entry.getEdit(); WALEdit edit = entry.getEdit();
this.metrics.incrLogEditsRead(); this.metrics.incrLogEditsRead();
@ -402,7 +389,7 @@ public class ReplicationSource extends Thread
//Mark that the current cluster has the change //Mark that the current cluster has the change
logKey.addClusterId(clusterId); logKey.addClusterId(clusterId);
currentNbOperations += countDistinctRowKeys(edit); currentNbOperations += countDistinctRowKeys(edit);
currentNbEntries++; entries.add(entry);
currentSize += entry.getEdit().heapSize(); currentSize += entry.getEdit().heapSize();
} else { } else {
this.metrics.incrLogEditsFiltered(); this.metrics.incrLogEditsFiltered();
@ -410,11 +397,11 @@ public class ReplicationSource extends Thread
} }
// Stop if too many entries or too big // Stop if too many entries or too big
if (currentSize >= this.replicationQueueSizeCapacity || if (currentSize >= this.replicationQueueSizeCapacity ||
currentNbEntries >= this.replicationQueueNbCapacity) { entries.size() >= this.replicationQueueNbCapacity) {
break; break;
} }
try { try {
entry = this.repLogReader.readNextAndSetPosition(this.entriesArray, this.currentNbEntries); entry = this.repLogReader.readNextAndSetPosition();
} catch (IOException ie) { } catch (IOException ie) {
LOG.debug("Break on IOE: " + ie.getMessage()); LOG.debug("Break on IOE: " + ie.getMessage());
break; break;
@ -583,8 +570,9 @@ public class ReplicationSource extends Thread
*/ */
protected void removeNonReplicableEdits(HLog.Entry entry) { protected void removeNonReplicableEdits(HLog.Entry entry) {
NavigableMap<byte[], Integer> scopes = entry.getKey().getScopes(); NavigableMap<byte[], Integer> scopes = entry.getKey().getScopes();
List<KeyValue> kvs = entry.getEdit().getKeyValues(); ArrayList<KeyValue> kvs = entry.getEdit().getKeyValues();
for (int i = kvs.size()-1; i >= 0; i--) { int size = kvs.size();
for (int i = size-1; i >= 0; i--) {
KeyValue kv = kvs.get(i); KeyValue kv = kvs.get(i);
// The scope will be null or empty if // The scope will be null or empty if
// there's nothing to replicate in that WALEdit // there's nothing to replicate in that WALEdit
@ -592,6 +580,9 @@ public class ReplicationSource extends Thread
kvs.remove(i); kvs.remove(i);
} }
} }
if (kvs.size() < size/2) {
kvs.trimToSize();
}
} }
/** /**
@ -617,9 +608,9 @@ public class ReplicationSource extends Thread
* @param currentWALisBeingWrittenTo was the current WAL being (seemingly) * @param currentWALisBeingWrittenTo was the current WAL being (seemingly)
* written to when this method was called * written to when this method was called
*/ */
protected void shipEdits(boolean currentWALisBeingWrittenTo) { protected void shipEdits(boolean currentWALisBeingWrittenTo, List<HLog.Entry> entries) {
int sleepMultiplier = 1; int sleepMultiplier = 1;
if (this.currentNbEntries == 0) { if (entries.isEmpty()) {
LOG.warn("Was given 0 edits to ship"); LOG.warn("Was given 0 edits to ship");
return; return;
} }
@ -635,22 +626,21 @@ public class ReplicationSource extends Thread
sinkPeer = replicationSinkMgr.getReplicationSink(); sinkPeer = replicationSinkMgr.getReplicationSink();
BlockingInterface rrs = sinkPeer.getRegionServer(); BlockingInterface rrs = sinkPeer.getRegionServer();
if (LOG.isTraceEnabled()) { if (LOG.isTraceEnabled()) {
LOG.trace("Replicating " + this.currentNbEntries + LOG.trace("Replicating " + entries.size() +
" entries of total size " + currentSize); " entries of total size " + currentSize);
} }
ReplicationProtbufUtil.replicateWALEntry(rrs, ReplicationProtbufUtil.replicateWALEntry(rrs,
Arrays.copyOf(this.entriesArray, currentNbEntries)); entries.toArray(new HLog.Entry[entries.size()]));
if (this.lastLoggedPosition != this.repLogReader.getPosition()) { if (this.lastLoggedPosition != this.repLogReader.getPosition()) {
this.manager.logPositionAndCleanOldLogs(this.currentPath, this.manager.logPositionAndCleanOldLogs(this.currentPath,
this.peerClusterZnode, this.repLogReader.getPosition(), this.peerClusterZnode, this.repLogReader.getPosition(),
this.replicationQueueInfo.isQueueRecovered(), currentWALisBeingWrittenTo); this.replicationQueueInfo.isQueueRecovered(), currentWALisBeingWrittenTo);
this.lastLoggedPosition = this.repLogReader.getPosition(); this.lastLoggedPosition = this.repLogReader.getPosition();
} }
this.totalReplicatedEdits += currentNbEntries; this.totalReplicatedEdits += entries.size();
this.totalReplicatedOperations += currentNbOperations; this.totalReplicatedOperations += currentNbOperations;
this.metrics.shipBatch(this.currentNbOperations); this.metrics.shipBatch(this.currentNbOperations);
this.metrics.setAgeOfLastShippedOp( this.metrics.setAgeOfLastShippedOp(entries.get(entries.size()-1).getKey().getWriteTime());
this.entriesArray[currentNbEntries-1].getKey().getWriteTime());
if (LOG.isTraceEnabled()) { if (LOG.isTraceEnabled()) {
LOG.trace("Replicated " + this.totalReplicatedEdits + " entries in total, or " LOG.trace("Replicated " + this.totalReplicatedEdits + " entries in total, or "
+ this.totalReplicatedOperations + " operations"); + this.totalReplicatedOperations + " operations");

View File

@ -143,7 +143,7 @@ public class TestWALPlayer {
when(context.getConfiguration()).thenReturn(configuration); when(context.getConfiguration()).thenReturn(configuration);
WALEdit value = mock(WALEdit.class); WALEdit value = mock(WALEdit.class);
List<KeyValue> values = new ArrayList<KeyValue>(); ArrayList<KeyValue> values = new ArrayList<KeyValue>();
KeyValue kv1 = mock(KeyValue.class); KeyValue kv1 = mock(KeyValue.class);
when(kv1.getFamily()).thenReturn(Bytes.toBytes("family")); when(kv1.getFamily()).thenReturn(Bytes.toBytes("family"));
when(kv1.getRow()).thenReturn(Bytes.toBytes("row")); when(kv1.getRow()).thenReturn(Bytes.toBytes("row"));

View File

@ -154,10 +154,9 @@ public class TestReplicationHLogReaderManager {
// There's one edit in the log, read it. Reading past it needs to return nulls // There's one edit in the log, read it. Reading past it needs to return nulls
assertNotNull(logManager.openReader(path)); assertNotNull(logManager.openReader(path));
logManager.seek(); logManager.seek();
HLog.Entry[] entriesArray = new HLog.Entry[1]; HLog.Entry entry = logManager.readNextAndSetPosition();
HLog.Entry entry = logManager.readNextAndSetPosition(entriesArray, 0);
assertNotNull(entry); assertNotNull(entry);
entry = logManager.readNextAndSetPosition(entriesArray, 0); entry = logManager.readNextAndSetPosition();
assertNull(entry); assertNull(entry);
logManager.closeReader(); logManager.closeReader();
long oldPos = logManager.getPosition(); long oldPos = logManager.getPosition();
@ -167,7 +166,7 @@ public class TestReplicationHLogReaderManager {
// Read the newly added entry, make sure we made progress // Read the newly added entry, make sure we made progress
assertNotNull(logManager.openReader(path)); assertNotNull(logManager.openReader(path));
logManager.seek(); logManager.seek();
entry = logManager.readNextAndSetPosition(entriesArray, 0); entry = logManager.readNextAndSetPosition();
assertNotEquals(oldPos, logManager.getPosition()); assertNotEquals(oldPos, logManager.getPosition());
assertNotNull(entry); assertNotNull(entry);
logManager.closeReader(); logManager.closeReader();
@ -178,7 +177,7 @@ public class TestReplicationHLogReaderManager {
// We rolled but we still should see the end of the first log and not get data // We rolled but we still should see the end of the first log and not get data
assertNotNull(logManager.openReader(path)); assertNotNull(logManager.openReader(path));
logManager.seek(); logManager.seek();
entry = logManager.readNextAndSetPosition(entriesArray, 0); entry = logManager.readNextAndSetPosition();
assertEquals(oldPos, logManager.getPosition()); assertEquals(oldPos, logManager.getPosition());
assertNull(entry); assertNull(entry);
logManager.finishCurrentFile(); logManager.finishCurrentFile();
@ -196,7 +195,7 @@ public class TestReplicationHLogReaderManager {
logManager.openReader(path); logManager.openReader(path);
logManager.seek(); logManager.seek();
for (int i = 0; i < nbRows; i++) { for (int i = 0; i < nbRows; i++) {
HLog.Entry e = logManager.readNextAndSetPosition(entriesArray, 0); HLog.Entry e = logManager.readNextAndSetPosition();
if (e == null) { if (e == null) {
fail("Should have enough entries"); fail("Should have enough entries");
} }