SOLR-11443: Remove the usage of workqueue for Overseer

This commit is contained in:
Cao Manh Dat 2017-10-14 09:33:17 +07:00
parent ec81838203
commit 9543e85460
6 changed files with 83 additions and 87 deletions

View File

@ -21,10 +21,12 @@ import java.io.IOException;
import java.lang.invoke.MethodHandles;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashSet;
import java.util.LinkedList;
import java.util.List;
import java.util.Locale;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.atomic.AtomicInteger;
@ -46,6 +48,7 @@ import org.apache.solr.common.cloud.ZkStateReader;
import org.apache.solr.common.params.CollectionParams;
import org.apache.solr.common.util.IOUtils;
import org.apache.solr.common.util.ObjectReleaseTracker;
import org.apache.solr.common.util.Pair;
import org.apache.solr.common.util.Utils;
import org.apache.solr.core.CloudConfig;
import org.apache.solr.handler.admin.CollectionsHandler;
@ -84,6 +87,7 @@ public class Overseer implements Closeable {
private final String myId;
//queue where everybody can throw tasks
private final ZkDistributedQueue stateUpdateQueue;
//TODO remove in 9.0, we do not push message into this queue anymore
//Internal queue where overseer stores events that have not yet been published into cloudstate
//If Overseer dies while extracting the main queue a new overseer will start from this queue
private final ZkDistributedQueue workQueue;
@ -132,6 +136,10 @@ public class Overseer implements Closeable {
ZkStateWriter zkStateWriter = null;
ClusterState clusterState = null;
boolean refreshClusterState = true; // let's refresh in the first iteration
// we write updates in batch, but if an exception is thrown when writing new clusterstate,
// we do not sure which message is bad message, therefore we will re-process node one by one
int fallbackQueueSize = Integer.MAX_VALUE;
ZkDistributedQueue fallbackQueue = workQueue;
while (!this.isClosed) {
isLeader = amILeader();
if (LeaderStatus.NO == isLeader) {
@ -153,11 +161,10 @@ public class Overseer implements Closeable {
// if there were any errors while processing
// the state queue, items would have been left in the
// work queue so let's process those first
byte[] data = workQueue.peek();
boolean hadWorkItems = data != null;
while (data != null) {
byte[] data = fallbackQueue.peek();
while (fallbackQueueSize > 0 && data != null) {
final ZkNodeProps message = ZkNodeProps.load(data);
log.debug("processMessage: workQueueSize: {}, message = {}", workQueue.getStats().getQueueLength(), message);
log.debug("processMessage: fallbackQueueSize: {}, message = {}", fallbackQueue.getStats().getQueueLength(), message);
// force flush to ZK after each message because there is no fallback if workQueue items
// are removed from workQueue but fail to be written to ZK
try {
@ -165,17 +172,19 @@ public class Overseer implements Closeable {
} catch (Exception e) {
if (isBadMessage(e)) {
log.warn("Exception when process message = {}, consider as bad message and poll out from the queue", message);
workQueue.poll();
fallbackQueue.poll();
}
throw e;
}
workQueue.poll(); // poll-ing removes the element we got by peek-ing
data = workQueue.peek();
}
// force flush at the end of the loop
if (hadWorkItems) {
clusterState = zkStateWriter.writePendingUpdates();
fallbackQueue.poll(); // poll-ing removes the element we got by peek-ing
data = fallbackQueue.peek();
fallbackQueueSize--;
}
// force flush at the end of the loop, if there are no pending updates, this is a no op call
clusterState = zkStateWriter.writePendingUpdates();
// the workQueue is empty now, use stateUpdateQueue as fallback queue
fallbackQueue = stateUpdateQueue;
fallbackQueueSize = 0;
} catch (KeeperException.SessionExpiredException e) {
log.warn("Solr cannot talk to ZK, exiting Overseer work queue loop", e);
return;
@ -189,9 +198,10 @@ public class Overseer implements Closeable {
}
}
byte[] head = null;
LinkedList<Pair<String, byte[]>> queue = null;
try {
head = stateUpdateQueue.peek(true);
// We do not need to filter any nodes here cause all processed nodes are removed once we flush clusterstate
queue = new LinkedList<>(stateUpdateQueue.peekElements(1000, Long.MAX_VALUE, (x) -> true));
} catch (KeeperException.SessionExpiredException e) {
log.warn("Solr cannot talk to ZK, exiting Overseer main queue loop", e);
return;
@ -202,45 +212,32 @@ public class Overseer implements Closeable {
log.error("Exception in Overseer main queue loop", e);
}
try {
boolean[] itemWasMoved = new boolean[1];
while (head != null) {
itemWasMoved[0] = false;
byte[] data = head;
final ZkNodeProps message = ZkNodeProps.load(data);
log.debug("processMessage: queueSize: {}, message = {} current state version: {}", stateUpdateQueue.getStats().getQueueLength(), message, clusterState.getZkClusterStateVersion());
// we can batch here because workQueue is our fallback in case a ZK write failed
clusterState = processQueueItem(message, clusterState, zkStateWriter, true, new ZkStateWriter.ZkWriteCallback() {
@Override
public void onEnqueue() throws Exception {
if (!itemWasMoved[0]) {
workQueue.offer(data);
stateUpdateQueue.poll();
itemWasMoved[0] = true;
}
}
Set<String> processedNodes = new HashSet<>();
while (queue != null && !queue.isEmpty()) {
for (Pair<String, byte[]> head : queue) {
byte[] data = head.second();
final ZkNodeProps message = ZkNodeProps.load(data);
log.debug("processMessage: queueSize: {}, message = {} current state version: {}", stateUpdateQueue.getStats().getQueueLength(), message, clusterState.getZkClusterStateVersion());
@Override
public void onWrite() throws Exception {
// remove everything from workQueue
while (workQueue.poll() != null);
}
});
// If the ZkWriteCallback never fired, just dump the item, it might be an invalid message.
if (!itemWasMoved[0]) {
stateUpdateQueue.poll();
processedNodes.add(head.first());
fallbackQueueSize = processedNodes.size();
// The callback always be called on this thread
clusterState = processQueueItem(message, clusterState, zkStateWriter, true, () -> {
stateUpdateQueue.remove(processedNodes);
processedNodes.clear();
});
}
if (isClosed) break;
// if an event comes in the next 100ms batch it together
head = stateUpdateQueue.peek(100);
queue = new LinkedList<>(stateUpdateQueue.peekElements(1000, 100, node -> !processedNodes.contains(node)));
}
fallbackQueueSize = processedNodes.size();
// we should force write all pending updates because the next iteration might sleep until there
// are more items in the main queue
clusterState = zkStateWriter.writePendingUpdates();
// clean work queue
while (workQueue.poll() != null);
stateUpdateQueue.remove(processedNodes);
processedNodes.clear();
} catch (KeeperException.SessionExpiredException e) {
log.warn("Solr cannot talk to ZK, exiting Overseer main queue loop", e);
return;

View File

@ -39,6 +39,7 @@ import org.apache.solr.common.cloud.ZkCmdExecutor;
import org.apache.solr.common.util.Pair;
import org.apache.zookeeper.CreateMode;
import org.apache.zookeeper.KeeperException;
import org.apache.zookeeper.Op;
import org.apache.zookeeper.WatchedEvent;
import org.apache.zookeeper.Watcher;
import org.apache.zookeeper.data.Stat;
@ -224,6 +225,42 @@ public class ZkDistributedQueue implements DistributedQueue {
}
}
public void remove(Collection<String> paths) throws KeeperException, InterruptedException {
if (paths.isEmpty()) return;
List<Op> ops = new ArrayList<>();
for (String path : paths) {
ops.add(Op.delete(dir + "/" + path, -1));
}
for (int from = 0; from < ops.size(); from += 1000) {
int to = Math.min(from + 1000, ops.size());
if (from < to) {
try {
zookeeper.multi(ops.subList(from, to), true);
} catch (KeeperException.NoNodeException e) {
// don't know which nodes are not exist, so try to delete one by one node
for (int j = from; j < to; j++) {
try {
zookeeper.delete(ops.get(j).getPath(), -1, true);
} catch (KeeperException.NoNodeException e2) {
LOG.debug("Can not remove node which is not exist : " + ops.get(j).getPath());
}
}
}
}
}
int cacheSizeBefore = knownChildren.size();
knownChildren.removeAll(paths);
if (cacheSizeBefore - paths.size() == knownChildren.size() && knownChildren.size() != 0) {
stats.setQueueLength(knownChildren.size());
} else {
// There are elements get deleted but not present in the cache,
// the cache seems not valid anymore
knownChildren.clear();
isDirty = true;
}
}
/**
* Removes the head of the queue and returns it, blocks until it succeeds.
*

View File

@ -112,10 +112,6 @@ public class ZkStateWriter {
if (cmds.isEmpty()) return prevState;
if (isNoOps(cmds)) return prevState;
if (callback != null) {
callback.onEnqueue();
}
for (ZkWriteCommand cmd : cmds) {
if (cmd == NO_OP) continue;
if (!isClusterStateModified && clusterStateGetModifiedWith(cmd, prevState)) {
@ -253,15 +249,10 @@ public class ZkStateWriter {
}
public interface ZkWriteCallback {
/**
* Called by ZkStateWriter if a ZkWriteCommand is queued
*/
public void onEnqueue() throws Exception;
/**
* Called by ZkStateWriter if state is flushed to ZK
*/
public void onWrite() throws Exception;
void onWrite() throws Exception;
}
}

View File

@ -191,7 +191,7 @@ public class OverseerCollectionConfigSetProcessorTest extends SolrTestCaseJ4 {
doAnswer(invocation -> {
queue.remove(invocation.getArgument(0));
return null;
}).when(workQueueMock).remove(any());
}).when(workQueueMock).remove(any(QueueEvent.class));
when(workQueueMock.poll()).thenAnswer(invocation -> {
queue.poll();

View File

@ -45,17 +45,7 @@ import org.slf4j.LoggerFactory;
public class ZkStateWriterTest extends SolrTestCaseJ4 {
private static final Logger log = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
private static final ZkStateWriter.ZkWriteCallback FAIL_ON_WRITE = new ZkStateWriter.ZkWriteCallback() {
@Override
public void onEnqueue() throws Exception {
}
@Override
public void onWrite() throws Exception {
fail("Got unexpected flush");
}
};
private static final ZkStateWriter.ZkWriteCallback FAIL_ON_WRITE = () -> fail("Got unexpected flush");
@BeforeClass
public static void setup() {
@ -106,31 +96,11 @@ public class ZkStateWriterTest extends SolrTestCaseJ4 {
Thread.sleep(Overseer.STATE_UPDATE_DELAY + 100);
AtomicBoolean didWrite = new AtomicBoolean(false);
clusterState = writer.enqueueUpdate(clusterState, Collections.singletonList(c3), new ZkStateWriter.ZkWriteCallback() {
@Override
public void onEnqueue() throws Exception {
}
@Override
public void onWrite() throws Exception {
didWrite.set(true);
}
});
clusterState = writer.enqueueUpdate(clusterState, Collections.singletonList(c3), () -> didWrite.set(true));
assertTrue("Exceed the update delay, should be flushed", didWrite.get());
for (int i = 0; i <= Overseer.STATE_UPDATE_BATCH_SIZE; i++) {
clusterState = writer.enqueueUpdate(clusterState, Collections.singletonList(c3), new ZkStateWriter.ZkWriteCallback() {
@Override
public void onEnqueue() throws Exception {
}
@Override
public void onWrite() throws Exception {
didWrite.set(true);
}
});
clusterState = writer.enqueueUpdate(clusterState, Collections.singletonList(c3), () -> didWrite.set(true));
}
assertTrue("Exceed the update batch size, should be flushed", didWrite.get());
}

View File

@ -38,4 +38,5 @@ public interface DistributedQueue {
Collection<Pair<String, byte[]>> peekElements(int max, long waitMillis, Predicate<String> acceptFilter) throws Exception;
void remove(Collection<String> paths) throws Exception;
}