SOLR-10983: Fix DOWNNODE -> queue-work explosion

This commit is contained in:
Scott Blum 2017-06-28 21:38:41 -04:00 committed by Scott Blum
parent d13e70f683
commit 380eed838d
3 changed files with 15 additions and 9 deletions

View File

@ -307,6 +307,8 @@ Bug Fixes
* SOLR-10879: DELETEREPLICA and DELETENODE commands should prevent data loss when * SOLR-10879: DELETEREPLICA and DELETENODE commands should prevent data loss when
replicationFactor is 1. (ab) replicationFactor is 1. (ab)
* SOLR-10983: Fix DOWNNODE -> queue-work explosion (Scott Blum, Joshua Humphries)
Optimizations Optimizations
---------------------- ----------------------

View File

@ -228,11 +228,8 @@ public class DistributedQueue {
} }
/** /**
* Inserts data into queue. Successfully calling this method does NOT guarantee * Inserts data into queue. If there are no other queue consumers, the offered element
* that the element will be immediately available in the in-memory queue. In particular, * will be immediately visible when this method returns.
* calling this method on an empty queue will not necessarily cause {@link #poll()} to
* return the offered element. Use a blocking method if you must wait for the offered
* element to become visible.
*/ */
public void offer(byte[] data) throws KeeperException, InterruptedException { public void offer(byte[] data) throws KeeperException, InterruptedException {
Timer.Context time = stats.time(dir + "_offer"); Timer.Context time = stats.time(dir + "_offer");

View File

@ -196,7 +196,9 @@ public class Overseer implements Closeable {
log.error("Exception in Overseer main queue loop", e); log.error("Exception in Overseer main queue loop", e);
} }
try { try {
boolean[] itemWasMoved = new boolean[1];
while (head != null) { while (head != null) {
itemWasMoved[0] = false;
byte[] data = head; byte[] data = head;
final ZkNodeProps message = ZkNodeProps.load(data); final ZkNodeProps message = ZkNodeProps.load(data);
log.debug("processMessage: queueSize: {}, message = {} current state version: {}", stateUpdateQueue.getStats().getQueueLength(), message, clusterState.getZkClusterStateVersion()); log.debug("processMessage: queueSize: {}, message = {} current state version: {}", stateUpdateQueue.getStats().getQueueLength(), message, clusterState.getZkClusterStateVersion());
@ -204,7 +206,11 @@ public class Overseer implements Closeable {
clusterState = processQueueItem(message, clusterState, zkStateWriter, true, new ZkStateWriter.ZkWriteCallback() { clusterState = processQueueItem(message, clusterState, zkStateWriter, true, new ZkStateWriter.ZkWriteCallback() {
@Override @Override
public void onEnqueue() throws Exception { public void onEnqueue() throws Exception {
workQueue.offer(data); if (!itemWasMoved[0]) {
stateUpdateQueue.poll();
itemWasMoved[0] = true;
workQueue.offer(data);
}
} }
@Override @Override
@ -214,9 +220,10 @@ public class Overseer implements Closeable {
} }
}); });
// it is safer to keep this poll here because an invalid message might never be queued // If the ZkWriteCallback never fired, just dump the item, it might be an invalid message.
// and therefore we can't rely on the ZkWriteCallback to remove the item if (!itemWasMoved[0]) {
stateUpdateQueue.poll(); stateUpdateQueue.poll();
}
if (isClosed) break; if (isClosed) break;
// if an event comes in the next 100ms batch it together // if an event comes in the next 100ms batch it together