mirror of https://github.com/apache/lucene.git
SOLR-11661: New HDFS collection reuses unremoved data from a deleted HDFS collection with same name causes inconsistent view of documents
This commit is contained in:
parent
e6928d857a
commit
c56d774eb6
|
@ -180,6 +180,9 @@ Bug Fixes
|
|||
|
||||
* SOLR-11873: Use time based expiration cache in all necessary places in HdfsDirectoryFactory. (Mihaly Toth via Mark Miller)
|
||||
|
||||
* SOLR-11661: New HDFS collection reuses unremoved data from a deleted HDFS collection with same name causes
|
||||
inconsistent view of documents (Cao Manh Dat, shalin)
|
||||
|
||||
Optimizations
|
||||
----------------------
|
||||
|
||||
|
|
|
@ -47,6 +47,7 @@ import org.apache.solr.common.cloud.Replica;
|
|||
import org.apache.solr.common.cloud.ReplicaPosition;
|
||||
import org.apache.solr.common.cloud.Slice;
|
||||
import org.apache.solr.common.cloud.ZkNodeProps;
|
||||
import org.apache.solr.common.cloud.ZkStateReader;
|
||||
import org.apache.solr.common.util.StrUtils;
|
||||
import org.apache.solr.common.util.Utils;
|
||||
import org.apache.solr.util.NumberUtils;
|
||||
|
@ -63,8 +64,12 @@ import static org.apache.solr.common.cloud.ZkStateReader.CORE_NAME_PROP;
|
|||
public class Assign {
|
||||
private static final Logger log = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
|
||||
|
||||
public static String getCounterNodePath(String collection) {
|
||||
return ZkStateReader.COLLECTIONS_ZKNODE + "/"+collection+"/counter";
|
||||
}
|
||||
|
||||
public static int incAndGetId(DistribStateManager stateManager, String collection, int defaultValue) {
|
||||
String path = "/collections/"+collection;
|
||||
String path = ZkStateReader.COLLECTIONS_ZKNODE + "/"+collection;
|
||||
try {
|
||||
if (!stateManager.hasData(path)) {
|
||||
try {
|
||||
|
|
|
@ -31,6 +31,7 @@ import org.apache.solr.common.NonExistentCoreException;
|
|||
import org.apache.solr.common.SolrException;
|
||||
import org.apache.solr.common.cloud.Aliases;
|
||||
import org.apache.solr.common.cloud.ClusterState;
|
||||
import org.apache.solr.common.cloud.Replica;
|
||||
import org.apache.solr.common.cloud.SolrZkClient;
|
||||
import org.apache.solr.common.cloud.ZkNodeProps;
|
||||
import org.apache.solr.common.cloud.ZkStateReader;
|
||||
|
@ -71,6 +72,7 @@ public class DeleteCollectionCmd implements OverseerCollectionMessageHandler.Cmd
|
|||
}
|
||||
}
|
||||
|
||||
boolean removeCounterNode = true;
|
||||
try {
|
||||
// Remove the snapshots meta-data for this collection in ZK. Deleting actual index files
|
||||
// should be taken care of as part of collection delete operation.
|
||||
|
@ -99,7 +101,16 @@ public class DeleteCollectionCmd implements OverseerCollectionMessageHandler.Cmd
|
|||
Set<String> okayExceptions = new HashSet<>(1);
|
||||
okayExceptions.add(NonExistentCoreException.class.getName());
|
||||
|
||||
ocmh.collectionCmd(message, params, results, null, asyncId, requestMap, okayExceptions);
|
||||
List<Replica> failedReplicas = ocmh.collectionCmd(message, params, results, null, asyncId, requestMap, okayExceptions);
|
||||
for (Replica failedRepilca : failedReplicas) {
|
||||
boolean isSharedFS = failedRepilca.getBool(ZkStateReader.SHARED_STORAGE_PROP, false) && failedRepilca.get("dataDir") != null;
|
||||
if (isSharedFS) {
|
||||
// if the replica use a shared FS and it did not receive the unload message, then counter node should not be removed
|
||||
// because when a new collection with same name is created, new replicas may reuse the old dataDir
|
||||
removeCounterNode = false;
|
||||
break;
|
||||
}
|
||||
}
|
||||
|
||||
ZkNodeProps m = new ZkNodeProps(Overseer.QUEUE_OPERATION, DELETE.toLower(), NAME, collection);
|
||||
Overseer.getStateUpdateQueue(zkStateReader.getZkClient()).offer(Utils.toJSON(m));
|
||||
|
@ -124,10 +135,14 @@ public class DeleteCollectionCmd implements OverseerCollectionMessageHandler.Cmd
|
|||
} finally {
|
||||
|
||||
try {
|
||||
if (zkStateReader.getZkClient().exists(
|
||||
ZkStateReader.COLLECTIONS_ZKNODE + "/" + collection, true)) {
|
||||
zkStateReader.getZkClient().clean(
|
||||
ZkStateReader.COLLECTIONS_ZKNODE + "/" + collection);
|
||||
String collectionPath = ZkStateReader.getCollectionPathRoot(collection);
|
||||
if (zkStateReader.getZkClient().exists(collectionPath, true)) {
|
||||
if (removeCounterNode) {
|
||||
zkStateReader.getZkClient().clean(collectionPath);
|
||||
} else {
|
||||
final String counterNodePath = Assign.getCounterNodePath(collection);
|
||||
zkStateReader.getZkClient().clean(collectionPath, s -> !s.equals(counterNodePath));
|
||||
}
|
||||
}
|
||||
} catch (InterruptedException e) {
|
||||
SolrException.log(log, "Cleaning up collection in zk was interrupted:"
|
||||
|
|
|
@ -18,6 +18,7 @@ package org.apache.solr.cloud.api.collections;
|
|||
|
||||
import java.io.IOException;
|
||||
import java.lang.invoke.MethodHandles;
|
||||
import java.util.ArrayList;
|
||||
import java.util.Arrays;
|
||||
import java.util.Collection;
|
||||
import java.util.Collections;
|
||||
|
@ -798,13 +799,16 @@ public class OverseerCollectionMessageHandler implements OverseerMessageHandler,
|
|||
}
|
||||
}
|
||||
|
||||
private void collectionCmd(ZkNodeProps message, ModifiableSolrParams params,
|
||||
private List<Replica> collectionCmd(ZkNodeProps message, ModifiableSolrParams params,
|
||||
NamedList results, Replica.State stateMatcher, String asyncId, Map<String, String> requestMap) {
|
||||
collectionCmd( message, params, results, stateMatcher, asyncId, requestMap, Collections.emptySet());
|
||||
return collectionCmd( message, params, results, stateMatcher, asyncId, requestMap, Collections.emptySet());
|
||||
}
|
||||
|
||||
|
||||
void collectionCmd(ZkNodeProps message, ModifiableSolrParams params,
|
||||
/**
|
||||
* Send request to all replicas of a collection
|
||||
* @return List of replicas which is not live for receiving the request
|
||||
*/
|
||||
List<Replica> collectionCmd(ZkNodeProps message, ModifiableSolrParams params,
|
||||
NamedList results, Replica.State stateMatcher, String asyncId, Map<String, String> requestMap, Set<String> okayExceptions) {
|
||||
log.info("Executing Collection Cmd : " + params);
|
||||
String collectionName = message.getStr(NAME);
|
||||
|
@ -812,30 +816,37 @@ public class OverseerCollectionMessageHandler implements OverseerMessageHandler,
|
|||
|
||||
ClusterState clusterState = zkStateReader.getClusterState();
|
||||
DocCollection coll = clusterState.getCollection(collectionName);
|
||||
|
||||
List<Replica> notLivesReplicas = new ArrayList<>();
|
||||
for (Slice slice : coll.getSlices()) {
|
||||
sliceCmd(clusterState, params, stateMatcher, slice, shardHandler, asyncId, requestMap);
|
||||
notLivesReplicas.addAll(sliceCmd(clusterState, params, stateMatcher, slice, shardHandler, asyncId, requestMap));
|
||||
}
|
||||
|
||||
processResponses(results, shardHandler, false, null, asyncId, requestMap, okayExceptions);
|
||||
|
||||
return notLivesReplicas;
|
||||
}
|
||||
|
||||
void sliceCmd(ClusterState clusterState, ModifiableSolrParams params, Replica.State stateMatcher,
|
||||
/**
|
||||
* Send request to all replicas of a slice
|
||||
* @return List of replicas which is not live for receiving the request
|
||||
*/
|
||||
List<Replica> sliceCmd(ClusterState clusterState, ModifiableSolrParams params, Replica.State stateMatcher,
|
||||
Slice slice, ShardHandler shardHandler, String asyncId, Map<String, String> requestMap) {
|
||||
|
||||
List<Replica> notLiveReplicas = new ArrayList<>();
|
||||
for (Replica replica : slice.getReplicas()) {
|
||||
if (clusterState.liveNodesContain(replica.getStr(ZkStateReader.NODE_NAME_PROP))
|
||||
&& (stateMatcher == null || Replica.State.getState(replica.getStr(ZkStateReader.STATE_PROP)) == stateMatcher)) {
|
||||
if ((stateMatcher == null || Replica.State.getState(replica.getStr(ZkStateReader.STATE_PROP)) == stateMatcher)) {
|
||||
if (clusterState.liveNodesContain(replica.getStr(ZkStateReader.NODE_NAME_PROP))) {
|
||||
// For thread safety, only simple clone the ModifiableSolrParams
|
||||
ModifiableSolrParams cloneParams = new ModifiableSolrParams();
|
||||
cloneParams.add(params);
|
||||
cloneParams.set(CoreAdminParams.CORE, replica.getStr(ZkStateReader.CORE_NAME_PROP));
|
||||
|
||||
// For thread safety, only simple clone the ModifiableSolrParams
|
||||
ModifiableSolrParams cloneParams = new ModifiableSolrParams();
|
||||
cloneParams.add(params);
|
||||
cloneParams.set(CoreAdminParams.CORE, replica.getStr(ZkStateReader.CORE_NAME_PROP));
|
||||
|
||||
sendShardRequest(replica.getStr(ZkStateReader.NODE_NAME_PROP), cloneParams, shardHandler, asyncId, requestMap);
|
||||
sendShardRequest(replica.getStr(ZkStateReader.NODE_NAME_PROP), cloneParams, shardHandler, asyncId, requestMap);
|
||||
} else {
|
||||
notLiveReplicas.add(replica);
|
||||
}
|
||||
}
|
||||
}
|
||||
return notLiveReplicas;
|
||||
}
|
||||
|
||||
private void processResponse(NamedList results, ShardResponse srsp, Set<String> okayExceptions) {
|
||||
|
|
|
@ -189,9 +189,9 @@ public class ZkStateWriter {
|
|||
DocCollection c = entry.getValue();
|
||||
|
||||
if (c == null) {
|
||||
// let's clean up the collections path for this collection
|
||||
log.debug("going to delete_collection {}", path);
|
||||
reader.getZkClient().clean("/collections/" + name);
|
||||
// let's clean up the state.json of this collection only, the rest should be clean by delete collection cmd
|
||||
log.debug("going to delete state.json {}", path);
|
||||
reader.getZkClient().clean(path);
|
||||
} else if (c.getStateFormat() > 1) {
|
||||
byte[] data = Utils.toJSON(singletonMap(c.getName(), c));
|
||||
if (reader.getZkClient().exists(path, true)) {
|
||||
|
|
|
@ -0,0 +1,95 @@
|
|||
/*
|
||||
* Licensed to the Apache Software Foundation (ASF) under one or more
|
||||
* contributor license agreements. See the NOTICE file distributed with
|
||||
* this work for additional information regarding copyright ownership.
|
||||
* The ASF licenses this file to You under the Apache License, Version 2.0
|
||||
* (the "License"); you may not use this file except in compliance with
|
||||
* the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
package org.apache.solr.cloud.hdfs;
|
||||
|
||||
|
||||
import com.carrotsearch.randomizedtesting.annotations.ThreadLeakFilters;
|
||||
import org.apache.hadoop.hdfs.MiniDFSCluster;
|
||||
import org.apache.solr.client.solrj.SolrQuery;
|
||||
import org.apache.solr.client.solrj.embedded.JettySolrRunner;
|
||||
import org.apache.solr.client.solrj.request.CollectionAdminRequest;
|
||||
import org.apache.solr.client.solrj.response.QueryResponse;
|
||||
import org.apache.solr.cloud.MoveReplicaHDFSTest;
|
||||
import org.apache.solr.cloud.SolrCloudTestCase;
|
||||
import org.apache.solr.common.SolrInputDocument;
|
||||
import org.apache.solr.common.cloud.Replica;
|
||||
import org.apache.solr.common.cloud.ZkConfigManager;
|
||||
import org.apache.solr.util.BadHdfsThreadsFilter;
|
||||
import org.junit.AfterClass;
|
||||
import org.junit.BeforeClass;
|
||||
|
||||
@ThreadLeakFilters(defaultFilters = true, filters = {
|
||||
BadHdfsThreadsFilter.class, // hdfs currently leaks thread(s)
|
||||
MoveReplicaHDFSTest.ForkJoinThreadsFilter.class
|
||||
})
|
||||
public class HDFSCollectionsAPITest extends SolrCloudTestCase {
|
||||
|
||||
private static MiniDFSCluster dfsCluster;
|
||||
|
||||
@BeforeClass
|
||||
public static void setupClass() throws Exception {
|
||||
configureCluster(2)
|
||||
.configure();
|
||||
|
||||
System.setProperty("solr.hdfs.blockcache.enabled", "false");
|
||||
dfsCluster = HdfsTestUtil.setupClass(createTempDir().toFile().getAbsolutePath());
|
||||
|
||||
ZkConfigManager configManager = new ZkConfigManager(zkClient());
|
||||
configManager.uploadConfigDir(configset("cloud-hdfs"), "conf1");
|
||||
|
||||
System.setProperty("solr.hdfs.home", HdfsTestUtil.getDataDir(dfsCluster, "data"));
|
||||
}
|
||||
|
||||
|
||||
@AfterClass
|
||||
public static void teardownClass() throws Exception {
|
||||
cluster.shutdown(); // need to close before the MiniDFSCluster
|
||||
HdfsTestUtil.teardownClass(dfsCluster);
|
||||
dfsCluster = null;
|
||||
}
|
||||
|
||||
public void testDataDirIsNotReused() throws Exception {
|
||||
JettySolrRunner jettySolrRunner = cluster.getJettySolrRunner(0);
|
||||
String collection = "test";
|
||||
cluster.getSolrClient().setDefaultCollection(collection);
|
||||
CollectionAdminRequest.createCollection(collection, "conf1", 1, 1)
|
||||
.setCreateNodeSet(jettySolrRunner.getNodeName()).process(cluster.getSolrClient());
|
||||
waitForState("", collection, clusterShape(1, 1));
|
||||
cluster.getSolrClient().setDefaultCollection(collection);
|
||||
cluster.getSolrClient().add(new SolrInputDocument("id", "1"));
|
||||
cluster.getSolrClient().add(new SolrInputDocument("id", "2"));
|
||||
cluster.getSolrClient().commit();
|
||||
cluster.getSolrClient().add(new SolrInputDocument("id", "3"));
|
||||
|
||||
jettySolrRunner.stop();
|
||||
waitForState("", collection, (liveNodes, collectionState) -> {
|
||||
Replica replica = collectionState.getSlice("shard1").getReplicas().iterator().next();
|
||||
return replica.getState() == Replica.State.DOWN;
|
||||
});
|
||||
CollectionAdminRequest.deleteCollection(collection).process(cluster.getSolrClient());
|
||||
|
||||
jettySolrRunner.start();
|
||||
|
||||
CollectionAdminRequest.createCollection(collection, "conf1", 1, 1)
|
||||
.setCreateNodeSet(cluster.getJettySolrRunner(1).getNodeName()).process(cluster.getSolrClient());
|
||||
waitForState("", collection, clusterShape(1, 1));
|
||||
QueryResponse response = cluster.getSolrClient().query(collection, new SolrQuery("*:*"));
|
||||
assertEquals(0L, response.getResults().getNumFound());
|
||||
}
|
||||
|
||||
}
|
|
@ -34,6 +34,7 @@ import java.nio.file.Path;
|
|||
import java.util.List;
|
||||
import java.util.concurrent.ExecutorService;
|
||||
import java.util.concurrent.RejectedExecutionException;
|
||||
import java.util.function.Predicate;
|
||||
import java.util.regex.Pattern;
|
||||
|
||||
import org.apache.commons.io.FileUtils;
|
||||
|
@ -761,6 +762,10 @@ public class SolrZkClient implements Closeable {
|
|||
ZkMaintenanceUtils.clean(this, path);
|
||||
}
|
||||
|
||||
public void clean(String path, Predicate<String> nodeFilter) throws InterruptedException, KeeperException {
|
||||
ZkMaintenanceUtils.clean(this, path, nodeFilter);
|
||||
}
|
||||
|
||||
public void upConfig(Path confPath, String confName) throws IOException {
|
||||
ZkMaintenanceUtils.upConfig(this, confPath, confName);
|
||||
}
|
||||
|
|
|
@ -26,8 +26,11 @@ import java.nio.file.Path;
|
|||
import java.nio.file.Paths;
|
||||
import java.nio.file.SimpleFileVisitor;
|
||||
import java.nio.file.attribute.BasicFileAttributes;
|
||||
import java.util.Comparator;
|
||||
import java.util.List;
|
||||
import java.util.Locale;
|
||||
import java.util.TreeSet;
|
||||
import java.util.function.Predicate;
|
||||
import java.util.regex.Pattern;
|
||||
|
||||
import org.apache.solr.client.solrj.SolrServerException;
|
||||
|
@ -244,6 +247,33 @@ public class ZkMaintenanceUtils {
|
|||
}
|
||||
});
|
||||
}
|
||||
|
||||
/**
|
||||
* Delete a path and all of its sub nodes
|
||||
* @param filter for node to be deleted
|
||||
*/
|
||||
public static void clean(SolrZkClient zkClient, String path, Predicate<String> filter) throws InterruptedException, KeeperException {
|
||||
if (filter == null) {
|
||||
clean(zkClient, path);
|
||||
return;
|
||||
}
|
||||
|
||||
TreeSet<String> paths = new TreeSet<>(Comparator.comparingInt(String::length).reversed());
|
||||
|
||||
traverseZkTree(zkClient, path, VISIT_ORDER.VISIT_POST, znode -> {
|
||||
if (!znode.equals("/") && filter.test(znode)) paths.add(znode);
|
||||
});
|
||||
|
||||
for (String subpath : paths) {
|
||||
if (!subpath.equals("/")) {
|
||||
try {
|
||||
zkClient.delete(subpath, -1, true);
|
||||
} catch (KeeperException.NotEmptyException | KeeperException.NoNodeException e) {
|
||||
// expected
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
public static void uploadToZK(SolrZkClient zkClient, final Path fromPath, final String zkPath,
|
||||
final Pattern filenameExclusions) throws IOException {
|
||||
|
|
Loading…
Reference in New Issue