SOLR-11739: Don't accept duplicate async IDs in collection API operations

This commit is contained in:
Tomas Fernandez Lobbe 2018-02-15 15:41:48 -08:00
parent 53640b95d8
commit 61ea8f60b1
13 changed files with 456 additions and 50 deletions

View File

@ -208,6 +208,8 @@ Bug Fixes
* SOLR-11950: Allow CLUSTERSTATUS "shard" parameter to accept comma (,) delimited list (Chris Ulicny via Jason Gerlowski)
* SOLR-11739: Fix race condition that made Solr accept duplicate async IDs in collection API operations (Tomás Fernánadez Löbbe)
Optimizations
----------------------

View File

@ -16,16 +16,18 @@
*/
package org.apache.solr.cloud;
import java.util.ArrayList;
import java.util.Collection;
import java.util.List;
import org.apache.solr.common.SolrException;
import org.apache.solr.common.SolrException.ErrorCode;
import org.apache.solr.common.cloud.SolrZkClient;
import org.apache.solr.common.cloud.ZkCmdExecutor;
import org.apache.zookeeper.CreateMode;
import org.apache.zookeeper.KeeperException;
import org.apache.zookeeper.KeeperException.NodeExistsException;
import org.apache.zookeeper.data.Stat;
import java.util.List;
/**
* A distributed map.
* This supports basic map functions e.g. get, put, contains for interaction with zk which
@ -58,6 +60,19 @@ public class DistributedMap {
public void put(String trackingId, byte[] data) throws KeeperException, InterruptedException {
zookeeper.makePath(dir + "/" + PREFIX + trackingId, data, CreateMode.PERSISTENT, null, false, true);
}
/**
* Puts an element in the map only if there isn't one with the same trackingId already
* @return True if the the element was added. False if it wasn't (because the key already exists)
*/
public boolean putIfAbsent(String trackingId, byte[] data) throws KeeperException, InterruptedException {
try {
zookeeper.makePath(dir + "/" + PREFIX + trackingId, data, CreateMode.PERSISTENT, null, true, true);
return true;
} catch (NodeExistsException e) {
return false;
}
}
public byte[] get(String trackingId) throws KeeperException, InterruptedException {
return zookeeper.getData(dir + "/" + PREFIX + trackingId, null, null, true);
@ -97,5 +112,16 @@ public class DistributedMap {
}
}
/**
* Returns the keys of all the elements in the map
*/
public Collection<String> keys() throws KeeperException, InterruptedException {
List<String> childs = zookeeper.getChildren(dir, null, true);
final List<String> ids = new ArrayList<>(childs.size());
childs.stream().forEach((child) -> ids.add(child.substring(PREFIX.length())));
return ids;
}
}

View File

@ -679,13 +679,19 @@ public class Overseer implements SolrCloseable {
/* Size-limited map for successfully completed tasks*/
static DistributedMap getCompletedMap(final SolrZkClient zkClient) {
createOverseerNode(zkClient);
return new SizeLimitedDistributedMap(zkClient, "/overseer/collection-map-completed", NUM_RESPONSES_TO_STORE);
return new SizeLimitedDistributedMap(zkClient, "/overseer/collection-map-completed", NUM_RESPONSES_TO_STORE, (child) -> getAsyncIdsMap(zkClient).remove(child));
}
/* Map for failed tasks, not to be used outside of the Overseer */
static DistributedMap getFailureMap(final SolrZkClient zkClient) {
createOverseerNode(zkClient);
return new SizeLimitedDistributedMap(zkClient, "/overseer/collection-map-failure", NUM_RESPONSES_TO_STORE);
return new SizeLimitedDistributedMap(zkClient, "/overseer/collection-map-failure", NUM_RESPONSES_TO_STORE, (child) -> getAsyncIdsMap(zkClient).remove(child));
}
/* Map of async IDs currently in use*/
static DistributedMap getAsyncIdsMap(final SolrZkClient zkClient) {
createOverseerNode(zkClient);
return new DistributedMap(zkClient, "/overseer/async_ids");
}
/**
@ -770,6 +776,7 @@ public class Overseer implements SolrCloseable {
createOverseerNode(zkClient);
return getCollectionQueue(zkClient, zkStats);
}
private static void createOverseerNode(final SolrZkClient zkClient) {
try {

View File

@ -70,7 +70,7 @@ public class OverseerTaskQueue extends ZkDistributedQueue {
if (data != null) {
ZkNodeProps message = ZkNodeProps.load(data);
if (message.containsKey(requestIdKey)) {
LOG.debug(">>>> {}", message.get(requestIdKey));
LOG.debug("Looking for {}, found {}", message.get(requestIdKey), requestId);
if(message.get(requestIdKey).equals(requestId)) return true;
}
}

View File

@ -17,7 +17,6 @@
package org.apache.solr.cloud;
import java.util.List;
import org.apache.lucene.util.PriorityQueue;
import org.apache.solr.common.cloud.SolrZkClient;
import org.apache.zookeeper.KeeperException;
@ -34,9 +33,19 @@ public class SizeLimitedDistributedMap extends DistributedMap {
private final int maxSize;
/**
* This observer will be called when this map overflows, and deletes the excess of elements
*/
private final OnOverflowObserver onOverflowObserver;
public SizeLimitedDistributedMap(SolrZkClient zookeeper, String dir, int maxSize) {
this(zookeeper, dir, maxSize, null);
}
public SizeLimitedDistributedMap(SolrZkClient zookeeper, String dir, int maxSize, OnOverflowObserver onOverflowObserver) {
super(zookeeper, dir);
this.maxSize = maxSize;
this.onOverflowObserver = onOverflowObserver;
}
@Override
@ -47,7 +56,7 @@ public class SizeLimitedDistributedMap extends DistributedMap {
int cleanupSize = maxSize / 10;
final PriorityQueue priorityQueue = new PriorityQueue<Long>(cleanupSize) {
final PriorityQueue<Long> priorityQueue = new PriorityQueue<Long>(cleanupSize) {
@Override
protected boolean lessThan(Long a, Long b) {
return (a > b);
@ -63,11 +72,17 @@ public class SizeLimitedDistributedMap extends DistributedMap {
for (String child : children) {
Stat stat = zookeeper.exists(dir + "/" + child, null, true);
if (stat.getMzxid() <= topElementMzxId)
if (stat.getMzxid() <= topElementMzxId) {
zookeeper.delete(dir + "/" + child, -1, true);
if (onOverflowObserver != null) onOverflowObserver.onChildDelete(child.substring(PREFIX.length()));
}
}
}
super.put(trackingId, data);
}
interface OnOverflowObserver {
void onChildDelete(String child) throws KeeperException, InterruptedException;
}
}

View File

@ -149,6 +149,7 @@ public class ZkController {
private final DistributedMap overseerRunningMap;
private final DistributedMap overseerCompletedMap;
private final DistributedMap overseerFailureMap;
private final DistributedMap asyncIdsMap;
public final static String COLLECTION_PARAM_PREFIX = "collection.";
public final static String CONFIGNAME_PROP = "configName";
@ -436,6 +437,8 @@ public class ZkController {
this.overseerRunningMap = Overseer.getRunningMap(zkClient);
this.overseerCompletedMap = Overseer.getCompletedMap(zkClient);
this.overseerFailureMap = Overseer.getFailureMap(zkClient);
this.asyncIdsMap = Overseer.getAsyncIdsMap(zkClient);
zkStateReader = new ZkStateReader(zkClient, () -> {
if (cc != null) cc.securityNodeChanged();
});
@ -1930,6 +1933,45 @@ public class ZkController {
return overseerFailureMap;
}
/**
* When an operation needs to be performed in an asynchronous mode, the asyncId needs
* to be claimed by calling this method to make sure it's not duplicate (hasn't been
* claimed by other request). If this method returns true, the asyncId in the parameter
* has been reserved for the operation, meaning that no other thread/operation can claim
* it. If for whatever reason, the operation is not scheduled, the asuncId needs to be
* cleared using {@link #clearAsyncId(String)}.
* If this method returns false, no reservation has been made, and this asyncId can't
* be used, since it's being used by another operation (currently or in the past)
* @param asyncId A string representing the asyncId of an operation. Can't be null.
* @return True if the reservation succeeds.
* False if this ID is already in use.
*/
public boolean claimAsyncId(String asyncId) throws KeeperException {
try {
return asyncIdsMap.putIfAbsent(asyncId, new byte[0]);
} catch (InterruptedException e) {
log.error("Could not claim asyncId=" + asyncId, e);
Thread.currentThread().interrupt();
throw new RuntimeException(e);
}
}
/**
* Clears an asyncId previously claimed by calling {@link #claimAsyncId(String)}
* @param asyncId A string representing the asyncId of an operation. Can't be null.
* @return True if the asyncId existed and was cleared.
* False if the asyncId didn't exist before.
*/
public boolean clearAsyncId(String asyncId) throws KeeperException {
try {
return asyncIdsMap.remove(asyncId);
} catch (InterruptedException e) {
log.error("Could not release asyncId=" + asyncId, e);
Thread.currentThread().interrupt();
throw new RuntimeException(e);
}
}
public int getClientTimeout() {
return clientTimeout;
}

View File

@ -810,7 +810,7 @@ public class OverseerCollectionMessageHandler implements OverseerMessageHandler,
*/
List<Replica> collectionCmd(ZkNodeProps message, ModifiableSolrParams params,
NamedList results, Replica.State stateMatcher, String asyncId, Map<String, String> requestMap, Set<String> okayExceptions) {
log.info("Executing Collection Cmd : " + params);
log.info("Executing Collection Cmd={}, asyncId={}", params, asyncId);
String collectionName = message.getStr(NAME);
ShardHandler shardHandler = shardHandlerFactory.getShardHandler();

View File

@ -279,6 +279,14 @@ public class CollectionsHandler extends RequestHandlerBase implements Permission
static final Set<String> KNOWN_ROLES = ImmutableSet.of("overseer");
/*
* In SOLR-11739 we change the way the async IDs are checked to decide if one has
* already been used or not. For backward compatibility, we continue to check in the
* old way (meaning, in all the queues) for now. This extra check should be removed
* in Solr 9
*/
private static final boolean CHECK_ASYNC_ID_BACK_COMPAT_LOCATIONS = true;
public static long DEFAULT_COLLECTION_OP_TIMEOUT = 180*1000;
public SolrResponse sendToOCPQueue(ZkNodeProps m) throws KeeperException, InterruptedException {
@ -294,21 +302,40 @@ public class CollectionsHandler extends RequestHandlerBase implements Permission
String asyncId = m.getStr(ASYNC);
if(asyncId.equals("-1")) {
if (asyncId.equals("-1")) {
throw new SolrException(ErrorCode.BAD_REQUEST, "requestid can not be -1. It is reserved for cleanup purposes.");
}
NamedList<String> r = new NamedList<>();
if (coreContainer.getZkController().getOverseerCompletedMap().contains(asyncId) ||
if (CHECK_ASYNC_ID_BACK_COMPAT_LOCATIONS && (
coreContainer.getZkController().getOverseerCompletedMap().contains(asyncId) ||
coreContainer.getZkController().getOverseerFailureMap().contains(asyncId) ||
coreContainer.getZkController().getOverseerRunningMap().contains(asyncId) ||
overseerCollectionQueueContains(asyncId)) {
overseerCollectionQueueContains(asyncId))) {
// for back compatibility, check in the old places. This can be removed in Solr 9
r.add("error", "Task with the same requestid already exists.");
} else {
coreContainer.getZkController().getOverseerCollectionQueue()
if (coreContainer.getZkController().claimAsyncId(asyncId)) {
boolean success = false;
try {
coreContainer.getZkController().getOverseerCollectionQueue()
.offer(Utils.toJSON(m));
success = true;
} finally {
if (!success) {
try {
coreContainer.getZkController().clearAsyncId(asyncId);
} catch (Exception e) {
// let the original exception bubble up
log.error("Unable to release async ID={}", asyncId, e);
SolrZkClient.checkInterrupted(e);
}
}
}
} else {
r.add("error", "Task with the same requestid already exists.");
}
}
r.add(CoreAdminParams.REQUESTID, (String) m.get(ASYNC));
@ -708,18 +735,29 @@ public class CollectionsHandler extends RequestHandlerBase implements Permission
}
if (flush) {
zkController.getOverseerCompletedMap().clear();
zkController.getOverseerFailureMap().clear();
Collection<String> completed = zkController.getOverseerCompletedMap().keys();
Collection<String> failed = zkController.getOverseerFailureMap().keys();
for (String asyncId:completed) {
zkController.getOverseerCompletedMap().remove(asyncId);
zkController.clearAsyncId(asyncId);
}
for (String asyncId:failed) {
zkController.getOverseerFailureMap().remove(asyncId);
zkController.clearAsyncId(asyncId);
}
rsp.getValues().add("status", "successfully cleared stored collection api responses");
return null;
} else {
// Request to cleanup
if (zkController.getOverseerCompletedMap().remove(requestId)) {
zkController.clearAsyncId(requestId);
rsp.getValues().add("status", "successfully removed stored response for [" + requestId + "]");
} else if (zkController.getOverseerFailureMap().remove(requestId)) {
zkController.clearAsyncId(requestId);
rsp.getValues().add("status", "successfully removed stored response for [" + requestId + "]");
} else {
rsp.getValues().add("status", "[" + requestId + "] not found in stored responses");
// Don't call zkController.clearAsyncId for this, since it could be a running/pending task
}
}
return null;

View File

@ -287,6 +287,7 @@ class RebalanceLeaders {
String asyncId = pair.getKey();
if (coreContainer.getZkController().getOverseerFailureMap().contains(asyncId)) {
coreContainer.getZkController().getOverseerFailureMap().remove(asyncId);
coreContainer.getZkController().clearAsyncId(asyncId);
NamedList<Object> fails = (NamedList<Object>) results.get("failures");
if (fails == null) {
fails = new NamedList<>();
@ -300,6 +301,7 @@ class RebalanceLeaders {
foundChange = true;
} else if (coreContainer.getZkController().getOverseerCompletedMap().contains(asyncId)) {
coreContainer.getZkController().getOverseerCompletedMap().remove(asyncId);
coreContainer.getZkController().clearAsyncId(asyncId);
NamedList<Object> successes = (NamedList<Object>) results.get("successes");
if (successes == null) {
successes = new NamedList<>();

View File

@ -0,0 +1,180 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.solr.cloud;
import java.io.IOException;
import java.nio.charset.Charset;
import java.nio.file.Path;
import java.util.Locale;
import org.apache.commons.io.FileUtils;
import org.apache.solr.SolrTestCaseJ4;
import org.apache.solr.common.cloud.SolrZkClient;
import org.apache.zookeeper.CreateMode;
import org.apache.zookeeper.KeeperException;
import org.junit.AfterClass;
import org.junit.BeforeClass;
public class TestDistributedMap extends SolrTestCaseJ4 {
private static Path zkDir;
protected static ZkTestServer zkServer;
@BeforeClass
public static void setUpClass() throws InterruptedException {
zkDir = createTempDir("TestDistributedMap");
zkServer = new ZkTestServer(zkDir.toFile().getAbsolutePath());
zkServer.run();
}
@AfterClass
public static void tearDownClass() throws IOException, InterruptedException {
if (zkServer != null) {
zkServer.shutdown();
zkServer = null;
}
FileUtils.deleteDirectory(zkDir.toFile());
zkDir = null;
}
public void testPut() throws KeeperException, InterruptedException {
try (SolrZkClient zkClient = new SolrZkClient(zkServer.getZkHost(), 10000)) {
String path = getAndMakeInitialPath(zkClient);
DistributedMap map = createMap(zkClient, path);
assertFalse(zkClient.exists(path + "/" + DistributedMap.PREFIX + "foo", true));
map.put("foo", new byte[0]);
assertTrue(zkClient.exists(path + "/" + DistributedMap.PREFIX + "foo", true));
}
}
public void testGet() throws KeeperException, InterruptedException {
try (SolrZkClient zkClient = new SolrZkClient(zkServer.getZkHost(), 10000)) {
String path = getAndMakeInitialPath(zkClient);
byte[] data = "data".getBytes(Charset.defaultCharset());
zkClient.makePath(path + "/" + DistributedMap.PREFIX + "foo", data, CreateMode.PERSISTENT, null, false, true);
DistributedMap map = createMap(zkClient, path);
assertArrayEquals(data, map.get("foo"));
}
}
public void testContains() throws KeeperException, InterruptedException {
try (SolrZkClient zkClient = new SolrZkClient(zkServer.getZkHost(), 10000)) {
String path = getAndMakeInitialPath(zkClient);
DistributedMap map = createMap(zkClient, path);
assertFalse(map.contains("foo"));
zkClient.makePath(path + "/" + DistributedMap.PREFIX + "foo", new byte[0], CreateMode.PERSISTENT, null, false, true);
assertTrue(map.contains("foo"));
}
}
public void testRemove() throws KeeperException, InterruptedException {
try (SolrZkClient zkClient = new SolrZkClient(zkServer.getZkHost(), 10000)) {
String path = getAndMakeInitialPath(zkClient);
DistributedMap map = createMap(zkClient, path);
assertFalse(map.remove("foo"));
zkClient.makePath(path + "/" + DistributedMap.PREFIX + "foo", new byte[0], CreateMode.PERSISTENT, null, false, true);
assertTrue(map.remove("foo"));
assertFalse(map.contains("foo"));
assertFalse(zkClient.exists(path + "/" + DistributedMap.PREFIX + "foo", true));
}
}
public void testSize() throws KeeperException, InterruptedException {
try (SolrZkClient zkClient = new SolrZkClient(zkServer.getZkHost(), 10000)) {
String path = getAndMakeInitialPath(zkClient);
DistributedMap map = createMap(zkClient, path);
assertEquals(0, map.size());
map.remove("bar");
assertEquals(0, map.size());
map.put("foo", new byte[0]);
assertEquals(1, map.size());
map.put("foo2", new byte[0]);
assertEquals(2, map.size());
map.remove("foo");
assertEquals(1, map.size());
}
}
public void testPutIfAbsent() throws KeeperException, InterruptedException {
try (SolrZkClient zkClient = new SolrZkClient(zkServer.getZkHost(), 10000)) {
String path = getAndMakeInitialPath(zkClient);
DistributedMap map = createMap(zkClient, path);
assertEquals(0, map.size());
assertFalse(map.contains("foo"));
assertTrue(map.putIfAbsent("foo", new byte[0]));
assertEquals(1, map.size());
assertTrue(map.contains("foo"));
assertFalse(map.putIfAbsent("foo", new byte[0]));
assertTrue(map.contains("foo"));
assertEquals(1, map.size());
map.remove("foo");
assertFalse(map.contains("foo"));
assertEquals(0, map.size());
assertTrue(map.putIfAbsent("foo", new byte[0]));
assertEquals(1, map.size());
assertTrue(map.contains("foo"));
}
}
public void testKeys() throws KeeperException, InterruptedException {
try (SolrZkClient zkClient = new SolrZkClient(zkServer.getZkHost(), 10000)) {
String path = getAndMakeInitialPath(zkClient);
DistributedMap map = createMap(zkClient, path);
assertEquals(0, map.keys().size());
map.put("foo", new byte[0]);
assertTrue(map.keys().contains("foo"));
assertEquals(1, map.keys().size());
map.put("bar", new byte[0]);
assertTrue(map.keys().contains("bar"));
assertTrue(map.keys().contains("foo"));
assertEquals(2, map.keys().size());
map.remove("foo");
assertTrue(map.keys().contains("bar"));
assertEquals(1, map.keys().size());
}
}
public void testClear() throws KeeperException, InterruptedException {
try (SolrZkClient zkClient = new SolrZkClient(zkServer.getZkHost(), 10000)) {
String path = getAndMakeInitialPath(zkClient);
DistributedMap map = createMap(zkClient, path);
map.clear();
assertEquals(0, map.size());
map.put("foo", new byte[0]);
map.put("bar", new byte[0]);
assertEquals(2, map.size());
map.clear();
assertEquals(0, map.size());
}
}
protected DistributedMap createMap(SolrZkClient zkClient, String path) {
return new DistributedMap(zkClient, path);
}
protected String getAndMakeInitialPath(SolrZkClient zkClient) throws KeeperException, InterruptedException {
String path = String.format(Locale.ROOT, "/%s/%s", getClass().getName(), getTestName());
zkClient.makePath(path, false, true);
return path;
}
}

View File

@ -17,42 +17,53 @@
package org.apache.solr.cloud;
import org.apache.solr.SolrTestCaseJ4;
import java.util.HashSet;
import java.util.LinkedList;
import java.util.List;
import java.util.Set;
import org.apache.solr.common.cloud.SolrZkClient;
public class TestSizeLimitedDistributedMap extends SolrTestCaseJ4 {
public class TestSizeLimitedDistributedMap extends TestDistributedMap {
public void testCleanup() throws Exception {
String zkDir = createTempDir("TestSizeLimitedDistributedMap").toFile().getAbsolutePath();
ZkTestServer server = new ZkTestServer(zkDir);
try {
server.run();
AbstractZkTestCase.tryCleanSolrZkNode(server.getZkHost());
AbstractZkTestCase.makeSolrZkNode(server.getZkHost());
try (SolrZkClient zkClient = new SolrZkClient(server.getZkAddress(), 10000)) {
DistributedMap map = Overseer.getCompletedMap(zkClient);
assertTrue(map instanceof SizeLimitedDistributedMap);
for (int i = 0; i < Overseer.NUM_RESPONSES_TO_STORE; i++) {
map.put("xyz_" + i, new byte[0]);
}
assertEquals("Number of items do not match", Overseer.NUM_RESPONSES_TO_STORE, map.size());
// add another to trigger cleanup
map.put("xyz_10000", new byte[0]);
assertEquals("Distributed queue was not cleaned up",
Overseer.NUM_RESPONSES_TO_STORE - (Overseer.NUM_RESPONSES_TO_STORE / 10) + 1, map.size());
for (int i = Overseer.NUM_RESPONSES_TO_STORE; i >= Overseer.NUM_RESPONSES_TO_STORE / 10; i--) {
assertTrue(map.contains("xyz_" + i));
}
for (int i = Overseer.NUM_RESPONSES_TO_STORE / 10 - 1; i >= 0; i--) {
assertFalse(map.contains("xyz_" + i));
}
final List<String> deletedItems = new LinkedList<>();
final Set<String> expectedKeys = new HashSet<>();
int numResponsesToStore=TEST_NIGHTLY?Overseer.NUM_RESPONSES_TO_STORE:100;
try (SolrZkClient zkClient = new SolrZkClient(zkServer.getZkHost(), 10000)) {
String path = getAndMakeInitialPath(zkClient);
DistributedMap map = new SizeLimitedDistributedMap(zkClient, path, numResponsesToStore, (element)->deletedItems.add(element));
for (int i = 0; i < numResponsesToStore; i++) {
map.put("xyz_" + i, new byte[0]);
expectedKeys.add("xyz_" + i);
}
} finally {
server.shutdown();
assertEquals("Number of items do not match", numResponsesToStore, map.size());
assertTrue("Expected keys do not match", expectedKeys.containsAll(map.keys()));
assertTrue("Expected keys do not match", map.keys().containsAll(expectedKeys));
// add another to trigger cleanup
map.put("xyz_" + numResponsesToStore, new byte[0]);
expectedKeys.add("xyz_" + numResponsesToStore);
assertEquals("Distributed queue was not cleaned up",
numResponsesToStore - (numResponsesToStore / 10) + 1, map.size());
for (int i = numResponsesToStore; i >= numResponsesToStore / 10; i--) {
assertTrue(map.contains("xyz_" + i));
}
for (int i = numResponsesToStore / 10 - 1; i >= 0; i--) {
assertFalse(map.contains("xyz_" + i));
assertTrue(deletedItems.contains("xyz_" + i));
expectedKeys.remove("xyz_" + i);
}
assertTrue("Expected keys do not match", expectedKeys.containsAll(map.keys()));
assertTrue("Expected keys do not match", map.keys().containsAll(expectedKeys));
map.remove("xyz_" + numResponsesToStore);
assertFalse("map.remove shouldn't trigger the observer",
deletedItems.contains("xyz_" + numResponsesToStore));
}
}
protected DistributedMap createMap(SolrZkClient zkClient, String path) {
return new SizeLimitedDistributedMap(zkClient, path, Overseer.NUM_RESPONSES_TO_STORE, null);
}
}

View File

@ -16,13 +16,22 @@
*/
package org.apache.solr.cloud.api.collections;
import java.io.IOException;
import java.lang.invoke.MethodHandles;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import org.apache.lucene.util.LuceneTestCase.Slow;
import org.apache.lucene.util.TestUtil;
import org.apache.solr.client.solrj.SolrClient;
import org.apache.solr.client.solrj.SolrQuery;
import org.apache.solr.client.solrj.SolrServerException;
import org.apache.solr.client.solrj.embedded.JettySolrRunner;
import org.apache.solr.client.solrj.impl.CloudSolrClient;
import org.apache.solr.client.solrj.impl.HttpSolrClient;
import org.apache.solr.client.solrj.request.CollectionAdminRequest;
import org.apache.solr.client.solrj.response.RequestStatusState;
import org.apache.solr.cloud.SolrCloudTestCase;
@ -30,8 +39,12 @@ import org.apache.solr.common.SolrException;
import org.apache.solr.common.SolrInputDocument;
import org.apache.solr.common.cloud.Replica;
import org.apache.solr.common.cloud.Slice;
import org.apache.solr.common.util.ExecutorUtil;
import org.apache.solr.util.DefaultSolrThreadFactory;
import org.junit.BeforeClass;
import org.junit.Test;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/**
* Tests the Cloud Collections API.
@ -40,6 +53,8 @@ import org.junit.Test;
public class CollectionsAPIAsyncDistributedZkTest extends SolrCloudTestCase {
private static final int MAX_TIMEOUT_SECONDS = 60;
private static final Logger log = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
@BeforeClass
public static void setupCluster() throws Exception {
@ -174,5 +189,74 @@ public class CollectionsAPIAsyncDistributedZkTest extends SolrCloudTestCase {
.processAndWait(client, MAX_TIMEOUT_SECONDS);
assertSame("DeleteCollection did not complete", RequestStatusState.COMPLETED, state);
}
public void testAsyncIdRaceCondition() throws Exception {
SolrClient[] clients = new SolrClient[cluster.getJettySolrRunners().size()];
int j = 0;
for (JettySolrRunner r:cluster.getJettySolrRunners()) {
clients[j++] = new HttpSolrClient.Builder(r.getBaseUrl().toString()).build();
}
RequestStatusState state = CollectionAdminRequest.createCollection("testAsyncIdRaceCondition","conf1",1,1)
.setRouterName("implicit")
.setShards("shard1")
.processAndWait(cluster.getSolrClient(), MAX_TIMEOUT_SECONDS);
assertSame("CreateCollection task did not complete!", RequestStatusState.COMPLETED, state);
int numThreads = 10;
final AtomicInteger numSuccess = new AtomicInteger(0);
final AtomicInteger numFailure = new AtomicInteger(0);
final CountDownLatch latch = new CountDownLatch(numThreads);
ExecutorService es = ExecutorUtil.newMDCAwareFixedThreadPool(numThreads, new DefaultSolrThreadFactory("testAsyncIdRaceCondition"));
try {
for (int i = 0; i < numThreads; i++) {
es.submit(new Runnable() {
@Override
public void run() {
CollectionAdminRequest.Reload reloadCollectionRequest = CollectionAdminRequest.reloadCollection("testAsyncIdRaceCondition");
latch.countDown();
try {
latch.await();
} catch (InterruptedException e) {
throw new RuntimeException();
}
try {
log.info("{} - Reloading Collection.", Thread.currentThread().getName());
reloadCollectionRequest.processAsync("repeatedId", clients[random().nextInt(clients.length)]);
numSuccess.incrementAndGet();
} catch (SolrServerException e) {
log.info(e.getMessage());
assertEquals("Task with the same requestid already exists.", e.getMessage());
numFailure.incrementAndGet();
} catch (IOException e) {
throw new RuntimeException();
}
}
});
}
es.shutdown();
assertTrue(es.awaitTermination(10, TimeUnit.SECONDS));
assertEquals(1, numSuccess.get());
assertEquals(numThreads - 1, numFailure.get());
} finally {
for (int i = 0; i < clients.length; i++) {
clients[i].close();
}
}
}
public void testAsyncIdBackCompat() throws Exception {
//remove with Solr 9
cluster.getZkClient().makePath("/overseer/collection-map-completed/mn-testAsyncIdBackCompat", true, true);
try {
CollectionAdminRequest.createCollection("testAsyncIdBackCompat","conf1",1,1)
.processAsync("testAsyncIdBackCompat", cluster.getSolrClient());
fail("Expecting exception");
} catch (SolrServerException e) {
assertTrue(e.getMessage().contains("Task with the same requestid already exists"));
}
}
}

View File

@ -520,7 +520,6 @@ public class ZkTestServer {
log.info("start zk server on port:" + port);
}
@SuppressWarnings("deprecation")
public void shutdown() throws IOException, InterruptedException {
// TODO: this can log an exception while trying to unregister a JMX MBean
zkServer.shutdown();