From 60ad668311fe9b32f72f81b29bfd0c57a8a2ab36 Mon Sep 17 00:00:00 2001 From: larsh Date: Tue, 12 Nov 2013 22:13:01 +0000 Subject: [PATCH] HBASE-9865 Reused WALEdits in replication may cause RegionServers to go OOM git-svn-id: https://svn.apache.org/repos/asf/hbase/trunk@1541265 13f79535-47bb-0310-9956-ffa450edef68 --- .../hbase/regionserver/wal/WALEdit.java | 7 ++- .../ReplicationHLogReaderManager.java | 7 +-- .../regionserver/ReplicationSource.java | 56 ++++++++----------- .../hadoop/hbase/mapreduce/TestWALPlayer.java | 2 +- .../TestReplicationHLogReaderManager.java | 11 ++-- 5 files changed, 35 insertions(+), 48 deletions(-) diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/WALEdit.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/WALEdit.java index d8b163f02d2..d4b36a603a3 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/WALEdit.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/WALEdit.java @@ -85,7 +85,7 @@ public class WALEdit implements Writable, HeapSize { private final int VERSION_2 = -1; private final boolean isReplay; - private final ArrayList kvs = new ArrayList(); + private final ArrayList kvs = new ArrayList(1); // Only here for legacy writable deserialization @Deprecated @@ -134,7 +134,7 @@ public class WALEdit implements Writable, HeapSize { return kvs.size(); } - public List getKeyValues() { + public ArrayList getKeyValues() { return kvs; } @@ -210,6 +210,7 @@ public class WALEdit implements Writable, HeapSize { */ public int readFromCells(Codec.Decoder cellDecoder, int expectedCount) throws IOException { kvs.clear(); + kvs.ensureCapacity(expectedCount); while (kvs.size() < expectedCount && cellDecoder.advance()) { Cell cell = cellDecoder.current(); if (!(cell instanceof KeyValue)) { @@ -221,7 +222,7 @@ public class WALEdit implements Writable, HeapSize { } public long heapSize() { - long ret = 0; + long ret = ClassSize.ARRAYLIST; for (KeyValue kv : kvs) { ret += kv.heapSize(); } diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationHLogReaderManager.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationHLogReaderManager.java index fe6924c91e0..6f8b7f50e88 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationHLogReaderManager.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationHLogReaderManager.java @@ -79,14 +79,11 @@ public class ReplicationHLogReaderManager { /** * Get the next entry, returned and also added in the array - * @param entriesArray - * @param currentNbEntries * @return a new entry or null * @throws IOException */ - public HLog.Entry readNextAndSetPosition(HLog.Entry[] entriesArray, - int currentNbEntries) throws IOException { - HLog.Entry entry = this.reader.next(entriesArray[currentNbEntries]); + public HLog.Entry readNextAndSetPosition() throws IOException { + HLog.Entry entry = this.reader.next(); // Store the position so that in the future the reader can start // reading from here. If the above call to next() throws an // exception, the position won't be changed and retry will happen diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSource.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSource.java index 8e6d17de86d..fba5d4f58b1 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSource.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSource.java @@ -23,7 +23,7 @@ import java.io.FileNotFoundException; import java.io.IOException; import java.net.ConnectException; import java.net.SocketTimeoutException; -import java.util.Arrays; +import java.util.ArrayList; import java.util.Comparator; import java.util.List; import java.util.NavigableMap; @@ -76,8 +76,6 @@ public class ReplicationSource extends Thread public static final Log LOG = LogFactory.getLog(ReplicationSource.class); // Queue of logs to process private PriorityBlockingQueue queue; - // container of entries to replicate - private HLog.Entry[] entriesArray; private HConnection conn; private ReplicationQueues replicationQueues; private ReplicationPeers replicationPeers; @@ -116,8 +114,6 @@ public class ReplicationSource extends Thread private int maxRetriesMultiplier; // Socket timeouts require even bolder actions since we don't want to DDOS private int socketTimeoutMultiplier; - // Current number of entries that we need to replicate - private int currentNbEntries = 0; // Current number of operations (Put/Delete) that we need to replicate private int currentNbOperations = 0; // Current size of data we need to replicate @@ -153,10 +149,6 @@ public class ReplicationSource extends Thread this.conf.getLong("replication.source.size.capacity", 1024*1024*64); this.replicationQueueNbCapacity = this.conf.getInt("replication.source.nb.capacity", 25000); - this.entriesArray = new HLog.Entry[this.replicationQueueNbCapacity]; - for (int i = 0; i < this.replicationQueueNbCapacity; i++) { - this.entriesArray[i] = new HLog.Entry(); - } this.maxRetriesMultiplier = this.conf.getInt("replication.source.maxretriesmultiplier", 10); this.socketTimeoutMultiplier = this.conf.getInt("replication.source.socketTimeoutMultiplier", maxRetriesMultiplier * maxRetriesMultiplier); @@ -289,10 +281,10 @@ public class ReplicationSource extends Thread boolean gotIOE = false; currentNbOperations = 0; - currentNbEntries = 0; + List entries = new ArrayList(1); currentSize = 0; try { - if (readAllEntriesToReplicateOrNextFile(currentWALisBeingWrittenTo)) { + if (readAllEntriesToReplicateOrNextFile(currentWALisBeingWrittenTo, entries)) { continue; } } catch (IOException ioe) { @@ -311,11 +303,6 @@ public class ReplicationSource extends Thread } catch (IOException e) { LOG.warn(this.peerClusterZnode + " Got while getting file size: ", e); } - } else if (currentNbEntries != 0) { - LOG.warn(this.peerClusterZnode + - " Got EOF while reading, " + "looks like this file is broken? " + currentPath); - considerDumping = true; - currentNbEntries = 0; } if (considerDumping && @@ -337,7 +324,7 @@ public class ReplicationSource extends Thread // If we didn't get anything to replicate, or if we hit a IOE, // wait a bit and retry. // But if we need to stop, don't bother sleeping - if (this.isActive() && (gotIOE || currentNbEntries == 0)) { + if (this.isActive() && (gotIOE || entries.isEmpty())) { if (this.lastLoggedPosition != this.repLogReader.getPosition()) { this.manager.logPositionAndCleanOldLogs(this.currentPath, this.peerClusterZnode, this.repLogReader.getPosition(), @@ -354,8 +341,7 @@ public class ReplicationSource extends Thread continue; } sleepMultiplier = 1; - shipEdits(currentWALisBeingWrittenTo); - + shipEdits(currentWALisBeingWrittenTo, entries); } if (this.conn != null) { try { @@ -372,11 +358,12 @@ public class ReplicationSource extends Thread * Read all the entries from the current log files and retain those * that need to be replicated. Else, process the end of the current file. * @param currentWALisBeingWrittenTo is the current WAL being written to + * @param entries resulting entries to be replicated * @return true if we got nothing and went to the next file, false if we got * entries * @throws IOException */ - protected boolean readAllEntriesToReplicateOrNextFile(boolean currentWALisBeingWrittenTo) + protected boolean readAllEntriesToReplicateOrNextFile(boolean currentWALisBeingWrittenTo, List entries) throws IOException{ long seenEntries = 0; if (LOG.isTraceEnabled()) { @@ -385,7 +372,7 @@ public class ReplicationSource extends Thread } this.repLogReader.seek(); HLog.Entry entry = - this.repLogReader.readNextAndSetPosition(this.entriesArray, this.currentNbEntries); + this.repLogReader.readNextAndSetPosition(); while (entry != null) { WALEdit edit = entry.getEdit(); this.metrics.incrLogEditsRead(); @@ -402,7 +389,7 @@ public class ReplicationSource extends Thread //Mark that the current cluster has the change logKey.addClusterId(clusterId); currentNbOperations += countDistinctRowKeys(edit); - currentNbEntries++; + entries.add(entry); currentSize += entry.getEdit().heapSize(); } else { this.metrics.incrLogEditsFiltered(); @@ -410,11 +397,11 @@ public class ReplicationSource extends Thread } // Stop if too many entries or too big if (currentSize >= this.replicationQueueSizeCapacity || - currentNbEntries >= this.replicationQueueNbCapacity) { + entries.size() >= this.replicationQueueNbCapacity) { break; } try { - entry = this.repLogReader.readNextAndSetPosition(this.entriesArray, this.currentNbEntries); + entry = this.repLogReader.readNextAndSetPosition(); } catch (IOException ie) { LOG.debug("Break on IOE: " + ie.getMessage()); break; @@ -583,8 +570,9 @@ public class ReplicationSource extends Thread */ protected void removeNonReplicableEdits(HLog.Entry entry) { NavigableMap scopes = entry.getKey().getScopes(); - List kvs = entry.getEdit().getKeyValues(); - for (int i = kvs.size()-1; i >= 0; i--) { + ArrayList kvs = entry.getEdit().getKeyValues(); + int size = kvs.size(); + for (int i = size-1; i >= 0; i--) { KeyValue kv = kvs.get(i); // The scope will be null or empty if // there's nothing to replicate in that WALEdit @@ -592,6 +580,9 @@ public class ReplicationSource extends Thread kvs.remove(i); } } + if (kvs.size() < size/2) { + kvs.trimToSize(); + } } /** @@ -617,9 +608,9 @@ public class ReplicationSource extends Thread * @param currentWALisBeingWrittenTo was the current WAL being (seemingly) * written to when this method was called */ - protected void shipEdits(boolean currentWALisBeingWrittenTo) { + protected void shipEdits(boolean currentWALisBeingWrittenTo, List entries) { int sleepMultiplier = 1; - if (this.currentNbEntries == 0) { + if (entries.isEmpty()) { LOG.warn("Was given 0 edits to ship"); return; } @@ -635,22 +626,21 @@ public class ReplicationSource extends Thread sinkPeer = replicationSinkMgr.getReplicationSink(); BlockingInterface rrs = sinkPeer.getRegionServer(); if (LOG.isTraceEnabled()) { - LOG.trace("Replicating " + this.currentNbEntries + + LOG.trace("Replicating " + entries.size() + " entries of total size " + currentSize); } ReplicationProtbufUtil.replicateWALEntry(rrs, - Arrays.copyOf(this.entriesArray, currentNbEntries)); + entries.toArray(new HLog.Entry[entries.size()])); if (this.lastLoggedPosition != this.repLogReader.getPosition()) { this.manager.logPositionAndCleanOldLogs(this.currentPath, this.peerClusterZnode, this.repLogReader.getPosition(), this.replicationQueueInfo.isQueueRecovered(), currentWALisBeingWrittenTo); this.lastLoggedPosition = this.repLogReader.getPosition(); } - this.totalReplicatedEdits += currentNbEntries; + this.totalReplicatedEdits += entries.size(); this.totalReplicatedOperations += currentNbOperations; this.metrics.shipBatch(this.currentNbOperations); - this.metrics.setAgeOfLastShippedOp( - this.entriesArray[currentNbEntries-1].getKey().getWriteTime()); + this.metrics.setAgeOfLastShippedOp(entries.get(entries.size()-1).getKey().getWriteTime()); if (LOG.isTraceEnabled()) { LOG.trace("Replicated " + this.totalReplicatedEdits + " entries in total, or " + this.totalReplicatedOperations + " operations"); diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/mapreduce/TestWALPlayer.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/mapreduce/TestWALPlayer.java index 64f98741696..309af73c51c 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/mapreduce/TestWALPlayer.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/mapreduce/TestWALPlayer.java @@ -143,7 +143,7 @@ public class TestWALPlayer { when(context.getConfiguration()).thenReturn(configuration); WALEdit value = mock(WALEdit.class); - List values = new ArrayList(); + ArrayList values = new ArrayList(); KeyValue kv1 = mock(KeyValue.class); when(kv1.getFamily()).thenReturn(Bytes.toBytes("family")); when(kv1.getRow()).thenReturn(Bytes.toBytes("row")); diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestReplicationHLogReaderManager.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestReplicationHLogReaderManager.java index 98a87dbc269..705b3ae9ab6 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestReplicationHLogReaderManager.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestReplicationHLogReaderManager.java @@ -154,10 +154,9 @@ public class TestReplicationHLogReaderManager { // There's one edit in the log, read it. Reading past it needs to return nulls assertNotNull(logManager.openReader(path)); logManager.seek(); - HLog.Entry[] entriesArray = new HLog.Entry[1]; - HLog.Entry entry = logManager.readNextAndSetPosition(entriesArray, 0); + HLog.Entry entry = logManager.readNextAndSetPosition(); assertNotNull(entry); - entry = logManager.readNextAndSetPosition(entriesArray, 0); + entry = logManager.readNextAndSetPosition(); assertNull(entry); logManager.closeReader(); long oldPos = logManager.getPosition(); @@ -167,7 +166,7 @@ public class TestReplicationHLogReaderManager { // Read the newly added entry, make sure we made progress assertNotNull(logManager.openReader(path)); logManager.seek(); - entry = logManager.readNextAndSetPosition(entriesArray, 0); + entry = logManager.readNextAndSetPosition(); assertNotEquals(oldPos, logManager.getPosition()); assertNotNull(entry); logManager.closeReader(); @@ -178,7 +177,7 @@ public class TestReplicationHLogReaderManager { // We rolled but we still should see the end of the first log and not get data assertNotNull(logManager.openReader(path)); logManager.seek(); - entry = logManager.readNextAndSetPosition(entriesArray, 0); + entry = logManager.readNextAndSetPosition(); assertEquals(oldPos, logManager.getPosition()); assertNull(entry); logManager.finishCurrentFile(); @@ -196,7 +195,7 @@ public class TestReplicationHLogReaderManager { logManager.openReader(path); logManager.seek(); for (int i = 0; i < nbRows; i++) { - HLog.Entry e = logManager.readNextAndSetPosition(entriesArray, 0); + HLog.Entry e = logManager.readNextAndSetPosition(); if (e == null) { fail("Should have enough entries"); }