mirror of https://github.com/apache/lucene.git
SOLR-9320: A REPLACENODE command to decommission an existing node with another new node and SOLR-9318 the DELETENODE command that deletes all replicas in a node
This commit is contained in:
parent
74b470a7a9
commit
519be6acf0
144
solr/CHANGES.txt
144
solr/CHANGES.txt
|
@ -16,6 +16,40 @@ In this release, there is an example Solr server including a bundled
|
|||
servlet container in the directory named "example".
|
||||
See the Quick Start guide at http://lucene.apache.org/solr/quickstart.html
|
||||
|
||||
================== 7.0.0 ==================
|
||||
|
||||
Upgrading from Solr 6.x
|
||||
----------------------
|
||||
|
||||
* HttpClientInterceptorPlugin is now HttpClientBuilderPlugin and must work with a
|
||||
SolrHttpClientBuilder rather than an HttpClientConfigurer.
|
||||
|
||||
* HttpClientUtil now allows configuring HttpClient instances via SolrHttpClientBuilder
|
||||
rather than an HttpClientConfigurer.
|
||||
|
||||
* SolrClient implementations now use their own internal configuration for socket timeouts,
|
||||
connect timeouts, and allowing redirects rather than what is set as the default when
|
||||
building the HttpClient instance. Use the appropriate setters on the SolrClient instance.
|
||||
|
||||
* HttpSolrClient#setAllowCompression has been removed and compression must be enabled as
|
||||
a constructor param.
|
||||
|
||||
* HttpSolrClient#setDefaultMaxConnectionsPerHost and
|
||||
HttpSolrClient#setMaxTotalConnections have been removed. These now default very
|
||||
high and can only be changed via param when creating an HttpClient instance.
|
||||
|
||||
Bug Fixes
|
||||
----------------------
|
||||
* SOLR-9262: Connection and read timeouts are being ignored by UpdateShardHandler after SOLR-4509.
|
||||
(Mark Miller, shalin)
|
||||
|
||||
Optimizations
|
||||
----------------------
|
||||
|
||||
* SOLR-4509: Move to non deprecated HttpClient impl classes to remove stale connection
|
||||
check on every request and move connection lifecycle management towards the client.
|
||||
(Ryan Zezeski, Mark Miller, Shawn Heisey, Steve Davids)
|
||||
|
||||
================== 6.2.0 ==================
|
||||
|
||||
Versions of Major Components
|
||||
|
@ -54,34 +88,18 @@ New Features
|
|||
* SOLR-9243: Add terms.list parameter to the TermsComponent to fetch the docFreq for a list of terms
|
||||
(Joel Bernstein)
|
||||
|
||||
* SOLR-9090: Add directUpdatesToLeadersOnly flag to solrj CloudSolrClient.
|
||||
(Marvin Justice, Christine Poerschke)
|
||||
|
||||
* SOLR-9270: Allow spatialContextFactory to be simply "JTS". And if any spatial params include the old
|
||||
Spatial4j package "com.spatial4j.core" it is rewritten to "org.locationtech.spatial4j" with a warning.
|
||||
(David Smiley)
|
||||
|
||||
* SOLR-9090: Add directUpdatesToLeadersOnly flag to solrj CloudSolrClient.
|
||||
(Marvin Justice, Christine Poerschke)
|
||||
|
||||
* SOLR-9240: Support parallel ETL with the topic expression (Joel Bernstein)
|
||||
|
||||
* SOLR-9275: XML QueryParser support (defType=xmlparser) now extensible via configuration.
|
||||
(Christine Poerschke)
|
||||
|
||||
* SOLR-9038: Solr core snapshots: The current commit can be snapshotted which retains the commit and associates it with
|
||||
a name. The core admin API can create snapshots, list them, and delete them. Snapshot names can be referenced in
|
||||
doing a core backup, and in replication. Snapshot metadata is stored in a new snapshot_metadata/ dir.
|
||||
(Hrishikesh Gadre via David Smiley)
|
||||
|
||||
* SOLR-9279: New boolean comparison function queries comparing numeric arguments: gt, gte, lt, lte, eq
|
||||
(Doug Turnbull, David Smiley)
|
||||
|
||||
* SOLR-9200: Add Delegation Token Support to Solr.
|
||||
(Gregory Chanan)
|
||||
|
||||
* SOLR-9252: Feature selection and logistic regression on text (Cao Manh Dat, Joel Bernstein)
|
||||
|
||||
* SOLR-6465: CDCR: fall back to whole-index replication when tlogs are insufficient.
|
||||
(Noble Paul, Renaud Delbru, shalin)
|
||||
|
||||
Bug Fixes
|
||||
----------------------
|
||||
|
||||
|
@ -101,14 +119,14 @@ Bug Fixes
|
|||
|
||||
* SOLR-8626: 404 error when clicking nodes in cloud graph view in angular UI. (janhoy, Trey Grainger via shalin)
|
||||
|
||||
* SOLR-8777: Duplicate Solr process can cripple a running process. (Jessica Cheng Mallet, Scott Blum, shalin)
|
||||
|
||||
* SOLR-9254: GraphTermsQueryQParserPlugin throws NPE when field being searched is not present in segment
|
||||
(Joel Bernstein)
|
||||
|
||||
* SOLR-8657: Fix SolrRequestInfo error logs if QuerySenderListener is being used (Pascal Chollet,
|
||||
Tomás Fernández Löbbe)
|
||||
|
||||
* SOLR-8777: Duplicate Solr process can cripple a running process. (Jessica Cheng Mallet, Scott Blum, shalin)
|
||||
|
||||
* SOLR-9246: If the JDBCStream sees an unknown column type it will now throw a detailed exception. (Dennis Gove)
|
||||
|
||||
* SOLR-9181: Fix some races in CollectionStateWatcher API (Alan Woodward, Scott
|
||||
|
@ -134,9 +152,6 @@ Bug Fixes
|
|||
* SOLR-9287: Including 'score' in the 'fl' param when doing an RTG no longer causes an NPE
|
||||
(hossman, Ishan Chattopadhyaya)
|
||||
|
||||
* SOLR-9290: TCP-connections in CLOSE_WAIT spike during heavy indexing and do not decrease.
|
||||
(Mads Tomasgård Bjørgan, Anshum Gupta, Shai Erera, hossman, Mark Miller, yonik, shalin)
|
||||
|
||||
* SOLR-7280: In cloud-mode sort the cores smartly before loading & limit threads to improve cluster stability
|
||||
(noble, Erick Erickson, shalin)
|
||||
|
||||
|
@ -150,19 +165,6 @@ Bug Fixes
|
|||
|
||||
* SOLR-9339: NPE in CloudSolrClient when the response is null (noble)
|
||||
|
||||
* SOLR-8596: Web UI doesn't correctly generate queries which include local parameters (Alexandre Rafalovitch, janhoy)
|
||||
|
||||
* SOLR-8645: managed-schema is now syntax highlighted in cloud->Tree view (Alexandre Rafalovitch via janhoy)
|
||||
|
||||
* SOLR-8379: UI Cloud->Tree view now shows .txt files correctly (Alexandre Rafalovitch via janhoy)
|
||||
|
||||
* SOLR-9003: New Admin UI's Dataimport screen now correctly displays DIH Debug output (Alexandre Rafalovitch)
|
||||
|
||||
* SOLR-9308: Fix distributed RTG to forward request params, fixes fq and non-default fl params (hossman)
|
||||
|
||||
* SOLR-9179: NPE in IndexSchema using IBM JDK (noble, Colvin Cowie)
|
||||
|
||||
* SOLR-9397: Config API does not support adding caches (noble)
|
||||
|
||||
Optimizations
|
||||
----------------------
|
||||
|
@ -172,13 +174,6 @@ Optimizations
|
|||
* SOLR-9264: Optimize ZkController.publishAndWaitForDownStates to not read all collection states and
|
||||
watch relevant collections instead. (Hrishikesh Gadre, shalin)
|
||||
|
||||
* SOLR-9335: Solr cache/search/update stats counters now use LongAdder which are supposed to have higher throughput
|
||||
under high contention. (Varun Thacker)
|
||||
|
||||
* SOLR-9350: JSON Facets: method="stream" will no longer always uses & populates the filter cache, likely
|
||||
flushing it. 'cacheDf' can be configured to set a doc frequency threshold, now defaulting to 1/16th doc count.
|
||||
Using -1 Disables use of the cache. (David Smiley, yonik)
|
||||
|
||||
Other Changes
|
||||
----------------------
|
||||
|
||||
|
@ -202,23 +197,6 @@ Other Changes
|
|||
* SOLR-9163: Sync up basic_configs and data_driven_schema_configs, removing almost all differences
|
||||
except what is required for schemaless. (yonik)
|
||||
|
||||
* SOLR-9340: Change ZooKeeper disconnect and session expiry related logging from INFO to WARN to
|
||||
make debugging easier (Varun Thacker)
|
||||
|
||||
* SOLR-9358: [AngularUI] In Cloud->Tree file view area, collapse metadata by default (janhoy)
|
||||
|
||||
* SOLR-9256: asserting hasNext() contract in JdbcDataSource in DataImportHandler (Kristine Jetzke via Mikhai Khludnev)
|
||||
|
||||
* SOLR-9209: extracting JdbcDataSource.createResultSetIterator() for extension (Kristine Jetzke via Mikhai Khludnev)
|
||||
|
||||
* SOLR-9392: Fixed CDCR Test failures which were due to leaked resources. (shalin)
|
||||
|
||||
* SOLR-9353: Factor out ReRankQParserPlugin.ReRankQueryRescorer private class. (Christine Poerschke)
|
||||
|
||||
* SOLR-9385: Add QParser.getParser(String,SolrQueryRequest) variant. (Christine Poerschke)
|
||||
|
||||
* SOLR-9367: Improved TestInjection's randomization logic to use LuceneTestCase.random() (hossman)
|
||||
|
||||
================== 6.1.0 ==================
|
||||
|
||||
Consult the LUCENE_CHANGES.txt file for additional, low level, changes in this release.
|
||||
|
@ -289,7 +267,12 @@ New Features
|
|||
|
||||
* SOLR-8349: Allow sharing of large in memory data structures across cores (Gus Heck, noble)
|
||||
|
||||
* SOLR-8913: When using a shared filesystem we should store data dir and tlog dir locations in
|
||||
* SOLR-9009: Adds ability to get an Explanation of a Streaming Expression (Dennis Gove)
|
||||
|
||||
* SOLR-8918: Adds Streaming to the admin page under the collections section. Includes
|
||||
ability to see graphically the expression explanation (Dennis Gove)
|
||||
|
||||
* SOLR-8913: When using a shared filesystem we should store data dir and tlog dir locations in
|
||||
the cluster state. (Mark Miller)
|
||||
|
||||
* SOLR-8809: Implement Connection.prepareStatement (Kevin Risden)
|
||||
|
@ -298,11 +281,6 @@ New Features
|
|||
|
||||
* SOLR-9041: 'core-admin-read' and 'core-admin-edit' are well known permissions (noble)
|
||||
|
||||
* SOLR-9009: Adds ability to get an Explanation of a Streaming Expression (Dennis Gove)
|
||||
|
||||
* SOLR-8918: Adds Streaming to the admin page under the collections section. Includes
|
||||
ability to see graphically the expression explanation (Dennis Gove)
|
||||
|
||||
* SOLR-8986: Add Random Streaming Expression (Joel Bernstein)
|
||||
|
||||
* SOLR-8925: Add gatherNodes Streaming Expression to support breadth first traversals (Joel Bernstein)
|
||||
|
@ -342,6 +320,8 @@ New Features
|
|||
Bug Fixes
|
||||
----------------------
|
||||
|
||||
* SOLR-8855: The HDFS BlockDirectory should not clean up it's cache on shutdown. (Mark Miller)
|
||||
|
||||
* SOLR-8948: OverseerTaskQueue.containsTaskWithRequestId encounters json parse error if a
|
||||
SolrResponse node is in the overseer queue. (Jessica Cheng Mallet via shalin)
|
||||
|
||||
|
@ -379,8 +359,6 @@ Bug Fixes
|
|||
|
||||
* SOLR-9198: config APIs unable to add multiple values with same name (noble)
|
||||
|
||||
* SOLR-9191: OverseerTaskQueue.peekTopN() fatally flawed (Scott Blum, Noble Paul)
|
||||
|
||||
* SOLR-8812: edismax: turn off mm processing if no explicit mm spec is provided
|
||||
and there are explicit operators (except for AND) - addresses problems caused by SOLR-2649.
|
||||
(Greg Pendlebury, Jan Høydahl, Erick Erickson, Steve Rowe)
|
||||
|
@ -425,10 +403,13 @@ Other Changes
|
|||
* SOLR-8866: UpdateLog will now throw an exception if it doesn't know how to serialize a value.
|
||||
(David Smiley)
|
||||
|
||||
* SOLR-8842: security rules made foolproof by asking the requesthandler about the well known
|
||||
permission name.
The APIs are also modified to use 'index' as the unique identifier instead of name.
|
||||
* SOLR-8842: security rules made more foolproof by asking the requesthandler about the well known
|
||||
permission name.
The APIs are also modified to ue 'index' as the unique identifier instead of name.
|
||||
Name is an optional attribute
now and only to be used when specifying well-known permissions (noble)
|
||||
|
||||
* SOLR-5616: Simplifies grouping code to use ResponseBuilder.needDocList() to determine if it needs to
|
||||
generate a doc list for grouped results. (Steven Bower, Keith Laban, Dennis Gove)
|
||||
|
||||
* SOLR-8869: Optionally disable printing field cache entries in SolrFieldCacheMBean (Gregory Chanan)
|
||||
|
||||
* SOLR-8892: Allow SolrInfoMBeans to return different statistics for /jmx vs web ui calls.
|
||||
|
@ -469,9 +450,6 @@ Other Changes
|
|||
|
||||
* SOLR-9110: Move JoinFromCollection- SubQueryTransformer- BlockJoinFacet- Distrib Tests to SolrCloudTestCase (Mikhail Khludnev)
|
||||
|
||||
* SOLR-9160: Sync 6x and 7.0 move of UninvertingReader, SlowCompositeReaderWrapper for Solr (LUCENE-7283)
|
||||
(yonik)
|
||||
|
||||
* SOLR-9136: Separate out the error statistics into server-side error vs client-side error
|
||||
(Jessica Cheng Mallet via Erick Erickson)
|
||||
|
||||
|
@ -839,8 +817,6 @@ Bug Fixes
|
|||
other than count, resulted in incorrect results. This has been fixed, and facet.prefix
|
||||
support for facet.method=uif has been enabled. (Mikhail Khludnev, yonik)
|
||||
|
||||
* SOLR-8135: If a core reload is fired after a core close, it is not a non-recoverable error (noble)
|
||||
|
||||
* SOLR-8790: Collections API responses contain node name in the core-level responses that are
|
||||
returned. (Anshum Gupta)
|
||||
|
||||
|
@ -986,19 +962,6 @@ Other Changes
|
|||
* SOLR-8758: Add a new SolrCloudTestCase class, using MiniSolrCloudCluster (Alan
|
||||
Woodward)
|
||||
|
||||
* SOLR-8764: Remove all deprecated methods and classes from master prior to the 6.0 release. (Steve Rowe)
|
||||
|
||||
* SOLR-8780: Remove unused OverseerCollectionMessageHandler#getClusterStatus method. (Varun Thacker)
|
||||
|
||||
* SOLR-8778: Deprecate CSVStrategy's setters, and make its pre-configured strategies immutable. (Steve Rowe)
|
||||
|
||||
* SOLR-7010: Remove facet.date client functionality. (Steve Rowe)
|
||||
|
||||
* SOLR-8725: Allow hyphen in collection, core, shard, and alias name as the non-first character (Anshum Gupta)
|
||||
|
||||
* SOLR-8423: DeleteShard and DeleteReplica should cleanup instance and data directory by default and add
|
||||
support for optionally retaining the directories. (Anshum Gupta)
|
||||
|
||||
* SOLR-8736: schema GET operations on fields, dynamicFields, fieldTypes, copyField are
|
||||
reimplemented as a part of the bulk API with less details (noble)
|
||||
|
||||
|
@ -1106,9 +1069,6 @@ Bug Fixes
|
|||
* SOLR-9176: facet method ENUM was sometimes unnecessarily being rewritten to
|
||||
FCS, causing slowdowns (Alessandro Benedetti, Jesse McLaughlin, Alan Woodward)
|
||||
|
||||
* SOLR-9234: srcField works only when all fields are captured in the /update/json/docs
|
||||
endpoint (noble)
|
||||
|
||||
Other Changes
|
||||
----------------------
|
||||
|
||||
|
|
|
@ -0,0 +1,86 @@
|
|||
/*
|
||||
* Licensed to the Apache Software Foundation (ASF) under one or more
|
||||
* contributor license agreements. See the NOTICE file distributed with
|
||||
* this work for additional information regarding copyright ownership.
|
||||
* The ASF licenses this file to You under the Apache License, Version 2.0
|
||||
* (the "License"); you may not use this file except in compliance with
|
||||
* the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
package org.apache.solr.cloud;
|
||||
|
||||
|
||||
import java.lang.invoke.MethodHandles;
|
||||
import java.util.List;
|
||||
import java.util.concurrent.CountDownLatch;
|
||||
import java.util.concurrent.TimeUnit;
|
||||
|
||||
import org.apache.solr.common.SolrException;
|
||||
import org.apache.solr.common.cloud.ClusterState;
|
||||
import org.apache.solr.common.cloud.ZkNodeProps;
|
||||
import org.apache.solr.common.util.NamedList;
|
||||
import org.apache.solr.common.util.Utils;
|
||||
import org.apache.zookeeper.KeeperException;
|
||||
import org.slf4j.Logger;
|
||||
import org.slf4j.LoggerFactory;
|
||||
|
||||
public class DeleteNodeCmd implements OverseerCollectionMessageHandler.Cmd {
|
||||
private static final Logger log = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
|
||||
|
||||
private final OverseerCollectionMessageHandler ocmh;
|
||||
|
||||
public DeleteNodeCmd(OverseerCollectionMessageHandler ocmh) {
|
||||
this.ocmh = ocmh;
|
||||
}
|
||||
|
||||
@Override
|
||||
public Object call(ClusterState state, ZkNodeProps message, NamedList results) throws Exception {
|
||||
ocmh.checkRequired(message, "node");
|
||||
String node = message.getStr("node");
|
||||
if (!state.liveNodesContain(node)) {
|
||||
throw new SolrException(SolrException.ErrorCode.BAD_REQUEST, "Source Node: " + node + " is not live");
|
||||
}
|
||||
List<ZkNodeProps> sourceReplicas = ReplaceNodeCmd.getReplicasOfNode(node, state);
|
||||
cleanupReplicas(results, state, sourceReplicas, ocmh);
|
||||
return null;
|
||||
}
|
||||
|
||||
static void cleanupReplicas(NamedList results,
|
||||
ClusterState clusterState,
|
||||
List<ZkNodeProps> sourceReplicas,
|
||||
OverseerCollectionMessageHandler ocmh) throws InterruptedException {
|
||||
CountDownLatch cleanupLatch = new CountDownLatch(sourceReplicas.size());
|
||||
for (ZkNodeProps sourceReplica : sourceReplicas) {
|
||||
log.info("deleting replica from from node {} ", Utils.toJSONString(sourceReplica));
|
||||
NamedList deleteResult = new NamedList();
|
||||
try {
|
||||
ocmh.deleteReplica(clusterState, sourceReplica.plus("parallel", "true"), deleteResult, () -> {
|
||||
cleanupLatch.countDown();
|
||||
if (deleteResult.get("failure") != null) {
|
||||
synchronized (results) {
|
||||
results.add("failure", "could not delete because " + deleteResult.get("failure") + " " + Utils.toJSONString(sourceReplica));
|
||||
}
|
||||
}
|
||||
});
|
||||
} catch (KeeperException e) {
|
||||
log.info("Error deleting ", e);
|
||||
cleanupLatch.countDown();
|
||||
} catch (Exception e) {
|
||||
cleanupLatch.countDown();
|
||||
throw e;
|
||||
}
|
||||
}
|
||||
log.info("Waiting for deletes to complete");
|
||||
cleanupLatch.await(5, TimeUnit.MINUTES);
|
||||
}
|
||||
|
||||
|
||||
}
|
|
@ -16,6 +16,10 @@
|
|||
*/
|
||||
package org.apache.solr.cloud;
|
||||
|
||||
import java.io.IOException;
|
||||
|
||||
import org.apache.commons.io.IOUtils;
|
||||
import org.apache.solr.common.cloud.ZkNodeProps;
|
||||
import org.apache.solr.common.cloud.ZkStateReader;
|
||||
import org.apache.solr.handler.component.ShardHandler;
|
||||
import org.apache.solr.handler.component.ShardHandlerFactory;
|
||||
|
@ -83,12 +87,20 @@ public class OverseerCollectionConfigSetProcessor extends OverseerTaskProcessor
|
|||
zkStateReader, myId, shardHandlerFactory, adminPath, stats, overseer, overseerNodePrioritizer);
|
||||
final OverseerConfigSetMessageHandler configMessageHandler = new OverseerConfigSetMessageHandler(
|
||||
zkStateReader);
|
||||
return message -> {
|
||||
String operation = message.getStr(Overseer.QUEUE_OPERATION);
|
||||
if (operation != null && operation.startsWith(CONFIGSETS_ACTION_PREFIX)) {
|
||||
return configMessageHandler;
|
||||
return new OverseerMessageHandlerSelector() {
|
||||
@Override
|
||||
public void close() throws IOException {
|
||||
IOUtils.closeQuietly(collMessageHandler);
|
||||
}
|
||||
|
||||
@Override
|
||||
public OverseerMessageHandler selectOverseerMessageHandler(ZkNodeProps message) {
|
||||
String operation = message.getStr(Overseer.QUEUE_OPERATION);
|
||||
if (operation != null && operation.startsWith(CONFIGSETS_ACTION_PREFIX)) {
|
||||
return configMessageHandler;
|
||||
}
|
||||
return collMessageHandler;
|
||||
}
|
||||
return collMessageHandler;
|
||||
};
|
||||
}
|
||||
}
|
||||
|
|
|
@ -16,6 +16,7 @@
|
|||
*/
|
||||
package org.apache.solr.cloud;
|
||||
|
||||
import java.io.Closeable;
|
||||
import java.io.IOException;
|
||||
import java.lang.invoke.MethodHandles;
|
||||
import java.net.URI;
|
||||
|
@ -35,8 +36,13 @@ import java.util.Optional;
|
|||
import java.util.Properties;
|
||||
import java.util.Random;
|
||||
import java.util.Set;
|
||||
import java.util.concurrent.Callable;
|
||||
import java.util.concurrent.ExecutorService;
|
||||
import java.util.concurrent.SynchronousQueue;
|
||||
import java.util.concurrent.TimeUnit;
|
||||
import java.util.concurrent.atomic.AtomicReference;
|
||||
|
||||
import com.google.common.collect.ImmutableMap;
|
||||
import org.apache.commons.lang.StringUtils;
|
||||
import org.apache.solr.client.solrj.SolrResponse;
|
||||
import org.apache.solr.client.solrj.SolrServerException;
|
||||
|
@ -75,6 +81,7 @@ import org.apache.solr.common.params.CoreAdminParams;
|
|||
import org.apache.solr.common.params.CoreAdminParams.CoreAdminAction;
|
||||
import org.apache.solr.common.params.ModifiableSolrParams;
|
||||
import org.apache.solr.common.params.ShardParams;
|
||||
import org.apache.solr.common.util.ExecutorUtil;
|
||||
import org.apache.solr.common.util.NamedList;
|
||||
import org.apache.solr.common.util.SimpleOrderedMap;
|
||||
import org.apache.solr.common.util.StrUtils;
|
||||
|
@ -88,6 +95,7 @@ import org.apache.solr.handler.component.ShardHandlerFactory;
|
|||
import org.apache.solr.handler.component.ShardRequest;
|
||||
import org.apache.solr.handler.component.ShardResponse;
|
||||
import org.apache.solr.update.SolrIndexSplitter;
|
||||
import org.apache.solr.util.DefaultSolrThreadFactory;
|
||||
import org.apache.solr.util.RTimer;
|
||||
import org.apache.solr.util.TimeOut;
|
||||
import org.apache.solr.util.stats.Snapshot;
|
||||
|
@ -119,10 +127,12 @@ import static org.apache.solr.common.params.CollectionParams.CollectionAction.BA
|
|||
import static org.apache.solr.common.params.CollectionParams.CollectionAction.CREATE;
|
||||
import static org.apache.solr.common.params.CollectionParams.CollectionAction.CREATESHARD;
|
||||
import static org.apache.solr.common.params.CollectionParams.CollectionAction.DELETE;
|
||||
import static org.apache.solr.common.params.CollectionParams.CollectionAction.DELETENODE;
|
||||
import static org.apache.solr.common.params.CollectionParams.CollectionAction.DELETEREPLICAPROP;
|
||||
import static org.apache.solr.common.params.CollectionParams.CollectionAction.DELETESHARD;
|
||||
import static org.apache.solr.common.params.CollectionParams.CollectionAction.MIGRATESTATEFORMAT;
|
||||
import static org.apache.solr.common.params.CollectionParams.CollectionAction.REMOVEROLE;
|
||||
import static org.apache.solr.common.params.CollectionParams.CollectionAction.REPLACENODE;
|
||||
import static org.apache.solr.common.params.CommonAdminParams.ASYNC;
|
||||
import static org.apache.solr.common.params.CommonParams.NAME;
|
||||
import static org.apache.solr.common.util.StrUtils.formatString;
|
||||
|
@ -132,7 +142,7 @@ import static org.apache.solr.common.util.Utils.makeMap;
|
|||
* A {@link OverseerMessageHandler} that handles Collections API related
|
||||
* overseer messages.
|
||||
*/
|
||||
public class OverseerCollectionMessageHandler implements OverseerMessageHandler {
|
||||
public class OverseerCollectionMessageHandler implements OverseerMessageHandler , Closeable {
|
||||
|
||||
public static final String NUM_SLICES = "numShards";
|
||||
|
||||
|
@ -172,7 +182,7 @@ public class OverseerCollectionMessageHandler implements OverseerMessageHandler
|
|||
private Overseer overseer;
|
||||
private ShardHandlerFactory shardHandlerFactory;
|
||||
private String adminPath;
|
||||
private ZkStateReader zkStateReader;
|
||||
ZkStateReader zkStateReader;
|
||||
private String myId;
|
||||
private Overseer.Stats stats;
|
||||
private OverseerNodePrioritizer overseerPrioritizer;
|
||||
|
@ -181,6 +191,9 @@ public class OverseerCollectionMessageHandler implements OverseerMessageHandler
|
|||
// This is used for handling mutual exclusion of the tasks.
|
||||
|
||||
final private LockTree lockTree = new LockTree();
|
||||
ExecutorService tpe = new ExecutorUtil.MDCAwareThreadPoolExecutor(5, 10, 0L, TimeUnit.MILLISECONDS,
|
||||
new SynchronousQueue<>(),
|
||||
new DefaultSolrThreadFactory("OverseerCollectionMessageHandlerThreadFactory"));
|
||||
|
||||
static final Random RANDOM;
|
||||
static {
|
||||
|
@ -193,6 +206,7 @@ public class OverseerCollectionMessageHandler implements OverseerMessageHandler
|
|||
RANDOM = new Random(seed.hashCode());
|
||||
}
|
||||
}
|
||||
private final Map<CollectionParams.CollectionAction, Cmd> commandMap;
|
||||
|
||||
public OverseerCollectionMessageHandler(ZkStateReader zkStateReader, String myId,
|
||||
final ShardHandlerFactory shardHandlerFactory,
|
||||
|
@ -207,6 +221,11 @@ public class OverseerCollectionMessageHandler implements OverseerMessageHandler
|
|||
this.stats = stats;
|
||||
this.overseer = overseer;
|
||||
this.overseerPrioritizer = overseerPrioritizer;
|
||||
commandMap = new ImmutableMap.Builder<CollectionParams.CollectionAction, Cmd>()
|
||||
.put(REPLACENODE, new ReplaceNodeCmd(this))
|
||||
.put(DELETENODE, new DeleteNodeCmd(this))
|
||||
.build()
|
||||
;
|
||||
}
|
||||
|
||||
@Override
|
||||
|
@ -244,7 +263,7 @@ public class OverseerCollectionMessageHandler implements OverseerMessageHandler
|
|||
createShard(zkStateReader.getClusterState(), message, results);
|
||||
break;
|
||||
case DELETEREPLICA:
|
||||
deleteReplica(zkStateReader.getClusterState(), message, results);
|
||||
deleteReplica(zkStateReader.getClusterState(), message, results, null);
|
||||
break;
|
||||
case MIGRATE:
|
||||
migrate(zkStateReader.getClusterState(), message, results);
|
||||
|
@ -256,7 +275,7 @@ public class OverseerCollectionMessageHandler implements OverseerMessageHandler
|
|||
processRoleCommand(message, operation);
|
||||
break;
|
||||
case ADDREPLICA:
|
||||
addReplica(zkStateReader.getClusterState(), message, results);
|
||||
addReplica(zkStateReader.getClusterState(), message, results, null);
|
||||
break;
|
||||
case OVERSEERSTATUS:
|
||||
getOverseerStatus(message, results);
|
||||
|
@ -294,9 +313,15 @@ public class OverseerCollectionMessageHandler implements OverseerMessageHandler
|
|||
results.add("MOCK_FINISHED", System.currentTimeMillis());
|
||||
break;
|
||||
}
|
||||
default:
|
||||
throw new SolrException(ErrorCode.BAD_REQUEST, "Unknown operation:"
|
||||
+ operation);
|
||||
default: {
|
||||
Cmd command = commandMap.get(action);
|
||||
if (command != null) {
|
||||
command.call(zkStateReader.getClusterState(),message, results);
|
||||
} else {
|
||||
throw new SolrException(ErrorCode.BAD_REQUEST, "Unknown operation:"
|
||||
+ operation);
|
||||
}
|
||||
}
|
||||
}
|
||||
} catch (Exception e) {
|
||||
String collName = message.getStr("collection");
|
||||
|
@ -590,12 +615,14 @@ public class OverseerCollectionMessageHandler implements OverseerMessageHandler
|
|||
}
|
||||
|
||||
@SuppressWarnings("unchecked")
|
||||
private void deleteReplica(ClusterState clusterState, ZkNodeProps message, NamedList results)
|
||||
void deleteReplica(ClusterState clusterState, ZkNodeProps message, NamedList results, Runnable onComplete)
|
||||
throws KeeperException, InterruptedException {
|
||||
log.info("deleteReplica() : {}", Utils.toJSONString(message));
|
||||
checkRequired(message, COLLECTION_PROP, SHARD_ID_PROP, REPLICA_PROP);
|
||||
String collectionName = message.getStr(COLLECTION_PROP);
|
||||
String shard = message.getStr(SHARD_ID_PROP);
|
||||
String replicaName = message.getStr(REPLICA_PROP);
|
||||
boolean parallel = message.getBool("parallel", false);
|
||||
|
||||
DocCollection coll = clusterState.getCollection(collectionName);
|
||||
Slice slice = coll.getSlice(shard);
|
||||
|
@ -623,9 +650,9 @@ public class OverseerCollectionMessageHandler implements OverseerMessageHandler
|
|||
ShardHandler shardHandler = shardHandlerFactory.getShardHandler();
|
||||
String core = replica.getStr(ZkStateReader.CORE_NAME_PROP);
|
||||
String asyncId = message.getStr(ASYNC);
|
||||
Map<String, String> requestMap = null;
|
||||
AtomicReference<Map<String, String>> requestMap = new AtomicReference<>(null);
|
||||
if (asyncId != null) {
|
||||
requestMap = new HashMap<>(1, 1.0f);
|
||||
requestMap.set(new HashMap<>(1, 1.0f));
|
||||
}
|
||||
|
||||
ModifiableSolrParams params = new ModifiableSolrParams();
|
||||
|
@ -636,19 +663,42 @@ public class OverseerCollectionMessageHandler implements OverseerMessageHandler
|
|||
params.set(CoreAdminParams.DELETE_INSTANCE_DIR, message.getBool(CoreAdminParams.DELETE_INSTANCE_DIR, true));
|
||||
params.set(CoreAdminParams.DELETE_DATA_DIR, message.getBool(CoreAdminParams.DELETE_DATA_DIR, true));
|
||||
|
||||
sendShardRequest(replica.getNodeName(), params, shardHandler, asyncId, requestMap);
|
||||
sendShardRequest(replica.getNodeName(), params, shardHandler, asyncId, requestMap.get());
|
||||
AtomicReference<Exception> exp = new AtomicReference<>();
|
||||
|
||||
processResponses(results, shardHandler, false, null, asyncId, requestMap);
|
||||
Callable<Boolean> callable = () -> {
|
||||
try {
|
||||
processResponses(results, shardHandler, false, null, asyncId, requestMap.get());
|
||||
|
||||
//check if the core unload removed the corenode zk entry
|
||||
if (waitForCoreNodeGone(collectionName, shard, replicaName, 5000)) return;
|
||||
//check if the core unload removed the corenode zk entry
|
||||
if (waitForCoreNodeGone(collectionName, shard, replicaName, 5000)) return Boolean.TRUE;
|
||||
|
||||
// try and ensure core info is removed from cluster state
|
||||
deleteCoreNode(collectionName, replicaName, replica, core);
|
||||
if (waitForCoreNodeGone(collectionName, shard, replicaName, 30000)) return;
|
||||
|
||||
throw new SolrException(ErrorCode.SERVER_ERROR,
|
||||
"Could not remove replica : " + collectionName + "/" + shard + "/" + replicaName);
|
||||
// try and ensure core info is removed from cluster state
|
||||
deleteCoreNode(collectionName, replicaName, replica, core);
|
||||
if (waitForCoreNodeGone(collectionName, shard, replicaName, 30000)) return Boolean.TRUE;
|
||||
return Boolean.FALSE;
|
||||
} catch (Exception e) {
|
||||
results.add("failure", "Could not complete delete " + e.getMessage());
|
||||
throw e;
|
||||
} finally {
|
||||
if (onComplete != null) onComplete.run();
|
||||
}
|
||||
};
|
||||
|
||||
if (!parallel) {
|
||||
try {
|
||||
if (!callable.call())
|
||||
throw new SolrException(ErrorCode.SERVER_ERROR,
|
||||
"Could not remove replica : " + collectionName + "/" + shard + "/" + replicaName);
|
||||
} catch (InterruptedException | KeeperException e) {
|
||||
throw e;
|
||||
} catch (Exception ex) {
|
||||
throw new SolrException(ErrorCode.UNKNOWN, "Error waiting for corenode gone", ex);
|
||||
}
|
||||
|
||||
} else {
|
||||
tpe.submit(callable);
|
||||
}
|
||||
}
|
||||
|
||||
private boolean waitForCoreNodeGone(String collectionName, String shard, String replicaName, int timeoutms) throws InterruptedException {
|
||||
|
@ -679,7 +729,7 @@ public class OverseerCollectionMessageHandler implements OverseerMessageHandler
|
|||
Overseer.getStateUpdateQueue(zkStateReader.getZkClient()).offer(Utils.toJSON(m));
|
||||
}
|
||||
|
||||
private void checkRequired(ZkNodeProps message, String... props) {
|
||||
void checkRequired(ZkNodeProps message, String... props) {
|
||||
for (String prop : props) {
|
||||
if(message.get(prop) == null){
|
||||
throw new SolrException(ErrorCode.BAD_REQUEST, StrUtils.join(Arrays.asList(props),',') +" are required params" );
|
||||
|
@ -1137,7 +1187,7 @@ public class OverseerCollectionMessageHandler implements OverseerMessageHandler
|
|||
if (asyncId != null) {
|
||||
propMap.put(ASYNC, asyncId);
|
||||
}
|
||||
addReplica(clusterState, new ZkNodeProps(propMap), results);
|
||||
addReplica(clusterState, new ZkNodeProps(propMap), results, null);
|
||||
}
|
||||
|
||||
processResponses(results, shardHandler, true, "SPLITSHARD failed to create subshard leaders", asyncId, requestMap);
|
||||
|
@ -1307,7 +1357,7 @@ public class OverseerCollectionMessageHandler implements OverseerMessageHandler
|
|||
|
||||
// now actually create replica cores on sub shard nodes
|
||||
for (Map<String, Object> replica : replicas) {
|
||||
addReplica(clusterState, new ZkNodeProps(replica), results);
|
||||
addReplica(clusterState, new ZkNodeProps(replica), results, null);
|
||||
}
|
||||
|
||||
processResponses(results, shardHandler, true, "SPLITSHARD failed to create subshard replicas", asyncId, requestMap);
|
||||
|
@ -1681,7 +1731,7 @@ public class OverseerCollectionMessageHandler implements OverseerMessageHandler
|
|||
if(asyncId != null) {
|
||||
props.put(ASYNC, asyncId);
|
||||
}
|
||||
addReplica(clusterState, new ZkNodeProps(props), results);
|
||||
addReplica(clusterState, new ZkNodeProps(props), results, null);
|
||||
|
||||
processResponses(results, shardHandler, true, "MIGRATE failed to create replica of " +
|
||||
"temporary collection in target leader node.", asyncId, requestMap);
|
||||
|
@ -2110,12 +2160,14 @@ public class OverseerCollectionMessageHandler implements OverseerMessageHandler
|
|||
}
|
||||
}
|
||||
|
||||
private void addReplica(ClusterState clusterState, ZkNodeProps message, NamedList results)
|
||||
ZkNodeProps addReplica(ClusterState clusterState, ZkNodeProps message, NamedList results, Runnable onComplete)
|
||||
throws KeeperException, InterruptedException {
|
||||
log.info("addReplica() : {}", Utils.toJSONString(message));
|
||||
String collection = message.getStr(COLLECTION_PROP);
|
||||
String node = message.getStr(CoreAdminParams.NODE);
|
||||
String shard = message.getStr(SHARD_ID_PROP);
|
||||
String coreName = message.getStr(CoreAdminParams.NAME);
|
||||
boolean parallel = message.getBool("parallel", false);
|
||||
if (StringUtils.isBlank(coreName)) {
|
||||
coreName = message.getStr(CoreAdminParams.PROPERTY_PREFIX + CoreAdminParams.NAME);
|
||||
}
|
||||
|
@ -2138,7 +2190,7 @@ public class OverseerCollectionMessageHandler implements OverseerMessageHandler
|
|||
node = getNodesForNewReplicas(clusterState, collection, shard, 1, node,
|
||||
overseer.getZkController().getCoreContainer()).get(0).nodeName;
|
||||
}
|
||||
log.info("Node not provided, Identified {} for creating new replica", node);
|
||||
log.info("Node Identified {} for creating new replica", node);
|
||||
|
||||
if (!clusterState.liveNodesContain(node)) {
|
||||
throw new SolrException(ErrorCode.BAD_REQUEST, "Node: " + node + " is not live");
|
||||
|
@ -2161,10 +2213,14 @@ public class OverseerCollectionMessageHandler implements OverseerMessageHandler
|
|||
|
||||
if (!Overseer.isLegacy(zkStateReader)) {
|
||||
if (!skipCreateReplicaInClusterState) {
|
||||
ZkNodeProps props = new ZkNodeProps(Overseer.QUEUE_OPERATION, ADDREPLICA.toLower(), ZkStateReader.COLLECTION_PROP,
|
||||
collection, ZkStateReader.SHARD_ID_PROP, shard, ZkStateReader.CORE_NAME_PROP, coreName,
|
||||
ZkStateReader.STATE_PROP, Replica.State.DOWN.toString(), ZkStateReader.BASE_URL_PROP,
|
||||
zkStateReader.getBaseUrlForNodeName(node), ZkStateReader.NODE_NAME_PROP, node);
|
||||
ZkNodeProps props = new ZkNodeProps(
|
||||
Overseer.QUEUE_OPERATION, ADDREPLICA.toLower(),
|
||||
ZkStateReader.COLLECTION_PROP, collection,
|
||||
ZkStateReader.SHARD_ID_PROP, shard,
|
||||
ZkStateReader.CORE_NAME_PROP, coreName,
|
||||
ZkStateReader.STATE_PROP, Replica.State.DOWN.toString(),
|
||||
ZkStateReader.BASE_URL_PROP, zkStateReader.getBaseUrlForNodeName(node),
|
||||
ZkStateReader.NODE_NAME_PROP, node);
|
||||
Overseer.getStateUpdateQueue(zkStateReader.getZkClient()).offer(Utils.toJSON(props));
|
||||
}
|
||||
params.set(CoreAdminParams.CORE_NODE_NAME,
|
||||
|
@ -2204,9 +2260,28 @@ public class OverseerCollectionMessageHandler implements OverseerMessageHandler
|
|||
Map<String,String> requestMap = new HashMap<>();
|
||||
sendShardRequest(node, params, shardHandler, asyncId, requestMap);
|
||||
|
||||
processResponses(results, shardHandler, true, "ADDREPLICA failed to create replica", asyncId, requestMap);
|
||||
final String fnode = node;
|
||||
final String fcoreName = coreName;
|
||||
|
||||
waitForCoreNodeName(collection, node, coreName);
|
||||
Runnable runnable = () -> {
|
||||
processResponses(results, shardHandler, true, "ADDREPLICA failed to create replica", asyncId, requestMap);
|
||||
waitForCoreNodeName(collection, fnode, fcoreName);
|
||||
if (onComplete != null) onComplete.run();
|
||||
};
|
||||
|
||||
if (!parallel) {
|
||||
runnable.run();
|
||||
} else {
|
||||
tpe.submit(runnable);
|
||||
}
|
||||
|
||||
|
||||
return new ZkNodeProps(
|
||||
ZkStateReader.COLLECTION_PROP, collection,
|
||||
ZkStateReader.SHARD_ID_PROP, shard,
|
||||
ZkStateReader.CORE_NAME_PROP, coreName,
|
||||
ZkStateReader.NODE_NAME_PROP, node
|
||||
);
|
||||
}
|
||||
|
||||
private void processBackupAction(ZkNodeProps message, NamedList results) throws IOException, KeeperException, InterruptedException {
|
||||
|
@ -2394,7 +2469,7 @@ public class OverseerCollectionMessageHandler implements OverseerMessageHandler
|
|||
}
|
||||
addPropertyParams(message, propMap);
|
||||
|
||||
addReplica(clusterState, new ZkNodeProps(propMap), new NamedList());
|
||||
addReplica(clusterState, new ZkNodeProps(propMap), new NamedList(), null);
|
||||
}
|
||||
|
||||
//refresh the location copy of collection state
|
||||
|
@ -2443,7 +2518,7 @@ public class OverseerCollectionMessageHandler implements OverseerMessageHandler
|
|||
}
|
||||
addPropertyParams(message, propMap);
|
||||
|
||||
addReplica(zkStateReader.getClusterState(), new ZkNodeProps(propMap), results);
|
||||
addReplica(zkStateReader.getClusterState(), new ZkNodeProps(propMap), results, null);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
@ -2503,7 +2578,7 @@ public class OverseerCollectionMessageHandler implements OverseerMessageHandler
|
|||
}
|
||||
return configName;
|
||||
}
|
||||
|
||||
|
||||
private void validateConfigOrThrowSolrException(String configName) throws KeeperException, InterruptedException {
|
||||
boolean isValid = zkStateReader.getZkClient().exists(ZkConfigManager.CONFIGS_ZKNODE + "/" + configName, true);
|
||||
if(!isValid) {
|
||||
|
@ -2723,4 +2798,19 @@ public class OverseerCollectionMessageHandler implements OverseerMessageHandler
|
|||
);
|
||||
}
|
||||
|
||||
@Override
|
||||
public void close() throws IOException {
|
||||
if (tpe != null) {
|
||||
if (!tpe.isShutdown()) {
|
||||
ExecutorUtil.shutdownAndAwaitTermination(tpe);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
interface Cmd {
|
||||
|
||||
Object call(ClusterState state, ZkNodeProps message, NamedList results) throws Exception;
|
||||
|
||||
}
|
||||
|
||||
}
|
||||
|
|
|
@ -31,6 +31,7 @@ import java.util.concurrent.TimeUnit;
|
|||
import java.util.function.Predicate;
|
||||
|
||||
import com.google.common.collect.ImmutableSet;
|
||||
import org.apache.commons.io.IOUtils;
|
||||
import org.apache.solr.client.solrj.SolrResponse;
|
||||
import org.apache.solr.cloud.OverseerTaskQueue.QueueEvent;
|
||||
import org.apache.solr.cloud.Overseer.LeaderStatus;
|
||||
|
@ -115,7 +116,7 @@ public class OverseerTaskProcessor implements Runnable, Closeable {
|
|||
|
||||
private final Object waitLock = new Object();
|
||||
|
||||
private OverseerMessageHandlerSelector selector;
|
||||
protected OverseerMessageHandlerSelector selector;
|
||||
|
||||
private OverseerNodePrioritizer prioritizer;
|
||||
|
||||
|
@ -328,6 +329,7 @@ public class OverseerTaskProcessor implements Runnable, Closeable {
|
|||
ExecutorUtil.shutdownAndAwaitTermination(tpe);
|
||||
}
|
||||
}
|
||||
IOUtils.closeQuietly(selector);
|
||||
}
|
||||
|
||||
public static List<String> getSortedOverseerNodeNames(SolrZkClient zk) throws KeeperException, InterruptedException {
|
||||
|
@ -588,7 +590,7 @@ public class OverseerTaskProcessor implements Runnable, Closeable {
|
|||
* messages only) , or a different handler could be selected based on the
|
||||
* contents of the message.
|
||||
*/
|
||||
public interface OverseerMessageHandlerSelector {
|
||||
public interface OverseerMessageHandlerSelector extends Closeable {
|
||||
OverseerMessageHandler selectOverseerMessageHandler(ZkNodeProps message);
|
||||
}
|
||||
|
||||
|
|
|
@ -0,0 +1,164 @@
|
|||
/*
|
||||
* Licensed to the Apache Software Foundation (ASF) under one or more
|
||||
* contributor license agreements. See the NOTICE file distributed with
|
||||
* this work for additional information regarding copyright ownership.
|
||||
* The ASF licenses this file to You under the Apache License, Version 2.0
|
||||
* (the "License"); you may not use this file except in compliance with
|
||||
* the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
package org.apache.solr.cloud;
|
||||
|
||||
|
||||
import java.lang.invoke.MethodHandles;
|
||||
import java.util.ArrayList;
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
import java.util.concurrent.CountDownLatch;
|
||||
import java.util.concurrent.TimeUnit;
|
||||
import java.util.concurrent.atomic.AtomicBoolean;
|
||||
|
||||
import org.apache.solr.common.SolrException;
|
||||
import org.apache.solr.common.cloud.ClusterState;
|
||||
import org.apache.solr.common.cloud.DocCollection;
|
||||
import org.apache.solr.common.cloud.Replica;
|
||||
import org.apache.solr.common.cloud.Slice;
|
||||
import org.apache.solr.common.cloud.ZkNodeProps;
|
||||
import org.apache.solr.common.cloud.ZkStateReader;
|
||||
import org.apache.solr.common.params.CoreAdminParams;
|
||||
import org.apache.solr.common.util.NamedList;
|
||||
import org.apache.solr.common.util.Utils;
|
||||
import org.apache.zookeeper.KeeperException;
|
||||
import org.slf4j.Logger;
|
||||
import org.slf4j.LoggerFactory;
|
||||
|
||||
import static org.apache.solr.common.cloud.ZkStateReader.COLLECTION_PROP;
|
||||
import static org.apache.solr.common.cloud.ZkStateReader.SHARD_ID_PROP;
|
||||
|
||||
public class ReplaceNodeCmd implements OverseerCollectionMessageHandler.Cmd {
|
||||
private static final Logger log = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
|
||||
|
||||
private final OverseerCollectionMessageHandler ocmh;
|
||||
|
||||
public ReplaceNodeCmd(OverseerCollectionMessageHandler ocmh) {
|
||||
this.ocmh = ocmh;
|
||||
}
|
||||
|
||||
@Override
|
||||
public Object call(ClusterState state, ZkNodeProps message, NamedList results) throws Exception {
|
||||
ZkStateReader zkStateReader = ocmh.zkStateReader;
|
||||
ocmh.checkRequired(message, "source", "target");
|
||||
String source = message.getStr("source");
|
||||
String target = message.getStr("target");
|
||||
boolean parallel = message.getBool("parallel", false);
|
||||
ClusterState clusterState = zkStateReader.getClusterState();
|
||||
|
||||
if (!clusterState.liveNodesContain(source)) {
|
||||
throw new SolrException(SolrException.ErrorCode.BAD_REQUEST, "Source Node: " + source + " is not live");
|
||||
}
|
||||
if (!clusterState.liveNodesContain(target)) {
|
||||
throw new SolrException(SolrException.ErrorCode.BAD_REQUEST, "Target Node: " + target + " is not live");
|
||||
}
|
||||
List<ZkNodeProps> sourceReplicas = getReplicasOfNode(source, clusterState);
|
||||
|
||||
List<ZkNodeProps> createdReplicas = new ArrayList<>();
|
||||
|
||||
AtomicBoolean anyOneFailed = new AtomicBoolean(false);
|
||||
CountDownLatch countDownLatch = new CountDownLatch(sourceReplicas.size());
|
||||
|
||||
for (ZkNodeProps sourceReplica : sourceReplicas) {
|
||||
NamedList nl = new NamedList();
|
||||
log.info("going to create replica {}", Utils.toJSONString(sourceReplica));
|
||||
ZkNodeProps msg = sourceReplica.plus("parallel", String.valueOf(parallel)).plus(CoreAdminParams.NODE, target);
|
||||
final ZkNodeProps addedReplica = ocmh.addReplica(clusterState,
|
||||
msg, nl, () -> {
|
||||
countDownLatch.countDown();
|
||||
if (nl.get("failure") != null) {
|
||||
log.warn("failed to create : " + Utils.toJSONString(msg));
|
||||
// one replica creation failed. Make the best attempt to
|
||||
// delete all the replicas created so far in the target
|
||||
// and exit
|
||||
synchronized (results) {
|
||||
results.add("failure", "Could not create copy of replica " + Utils.toJSONString(sourceReplica));
|
||||
anyOneFailed.set(true);
|
||||
}
|
||||
} else {
|
||||
log.info("successfully created : " + Utils.toJSONString(msg));
|
||||
|
||||
}
|
||||
});
|
||||
|
||||
if (addedReplica != null) {
|
||||
createdReplicas.add(addedReplica);
|
||||
}
|
||||
}
|
||||
|
||||
log.info("Waiting for creates to complete ");
|
||||
countDownLatch.await(5, TimeUnit.MINUTES);
|
||||
log.info("Waiting over for creates to complete ");
|
||||
|
||||
if (anyOneFailed.get()) {
|
||||
log.info("failed to create some cores delete all " + Utils.toJSONString(createdReplicas));
|
||||
CountDownLatch cleanupLatch = new CountDownLatch(createdReplicas.size());
|
||||
for (ZkNodeProps createdReplica : createdReplicas) {
|
||||
NamedList deleteResult = new NamedList();
|
||||
try {
|
||||
ocmh.deleteReplica(zkStateReader.getClusterState(), createdReplica.plus("parallel", "true"), deleteResult, () -> {
|
||||
cleanupLatch.countDown();
|
||||
if (deleteResult.get("failure") != null) {
|
||||
synchronized (results) {
|
||||
results.add("failure", "could not cleanup, because : " + deleteResult.get("failure") + " " + Utils.toJSONString(createdReplica));
|
||||
}
|
||||
}
|
||||
});
|
||||
} catch (KeeperException e) {
|
||||
cleanupLatch.countDown();
|
||||
log.info("Error deleting ", e);
|
||||
} catch (Exception e) {
|
||||
log.error("Unknown Error deleteing", e);
|
||||
cleanupLatch.countDown();
|
||||
throw e;
|
||||
}
|
||||
}
|
||||
cleanupLatch.await(5, TimeUnit.MINUTES);
|
||||
return null;
|
||||
}
|
||||
|
||||
|
||||
// we have reached this far means all replicas could be recreated
|
||||
//now cleanup the replicas in the source node
|
||||
DeleteNodeCmd.cleanupReplicas(results, state, sourceReplicas, ocmh);
|
||||
results.add("success", "REPLACENODE completed successfully from : " + source + " to : " + target);
|
||||
return null;
|
||||
}
|
||||
|
||||
static List<ZkNodeProps> getReplicasOfNode(String source, ClusterState state) {
|
||||
List<ZkNodeProps> sourceReplicas = new ArrayList<>();
|
||||
for (Map.Entry<String, DocCollection> e : state.getCollectionsMap().entrySet()) {
|
||||
for (Slice slice : e.getValue().getSlices()) {
|
||||
for (Replica replica : slice.getReplicas()) {
|
||||
if (source.equals(replica.getNodeName())) {
|
||||
ZkNodeProps props = new ZkNodeProps(
|
||||
COLLECTION_PROP, e.getKey(),
|
||||
SHARD_ID_PROP, slice.getName(),
|
||||
ZkStateReader.CORE_NAME_PROP, replica.getCoreName(),
|
||||
ZkStateReader.REPLICA_PROP, replica.getName(),
|
||||
CoreAdminParams.NODE, source);
|
||||
log.info("src_core : {}", Utils.toJSONString(props));
|
||||
sourceReplicas.add(props
|
||||
);
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
return sourceReplicas;
|
||||
}
|
||||
}
|
|
@ -777,7 +777,10 @@ public class CollectionsHandler extends RequestHandlerBase implements Permission
|
|||
req.getParams().getAll(params, COLL_CONF, REPLICATION_FACTOR, MAX_SHARDS_PER_NODE, STATE_FORMAT, AUTO_ADD_REPLICAS);
|
||||
copyPropertiesWithPrefix(req.getParams(), params, COLL_PROP_PREFIX);
|
||||
return params;
|
||||
});
|
||||
}),
|
||||
|
||||
REPLACENODE_OP(REPLACENODE, (req, rsp, h) -> req.getParams().required().getAll(req.getParams().getAll(null, "parallel"), "source", "target")),
|
||||
DELETENODE_OP(DELETENODE, (req, rsp, h) -> req.getParams().required().getAll(null, "node"));
|
||||
public final CollectionOp fun;
|
||||
CollectionAction action;
|
||||
long timeOut;
|
||||
|
|
|
@ -0,0 +1,75 @@
|
|||
/*
|
||||
* Licensed to the Apache Software Foundation (ASF) under one or more
|
||||
* contributor license agreements. See the NOTICE file distributed with
|
||||
* this work for additional information regarding copyright ownership.
|
||||
* The ASF licenses this file to You under the Apache License, Version 2.0
|
||||
* (the "License"); you may not use this file except in compliance with
|
||||
* the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
package org.apache.solr.cloud;
|
||||
|
||||
|
||||
import java.lang.invoke.MethodHandles;
|
||||
import java.util.ArrayList;
|
||||
import java.util.Collections;
|
||||
import java.util.Set;
|
||||
|
||||
import org.apache.solr.client.solrj.impl.CloudSolrClient;
|
||||
import org.apache.solr.client.solrj.request.CollectionAdminRequest;
|
||||
import org.apache.solr.client.solrj.response.RequestStatusState;
|
||||
import org.apache.solr.common.util.StrUtils;
|
||||
import org.junit.BeforeClass;
|
||||
import org.junit.Test;
|
||||
import org.slf4j.Logger;
|
||||
import org.slf4j.LoggerFactory;
|
||||
|
||||
public class DeleteNodeTest extends SolrCloudTestCase {
|
||||
private static final Logger log = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
|
||||
|
||||
@BeforeClass
|
||||
public static void setupCluster() throws Exception {
|
||||
configureCluster(6)
|
||||
.addConfig("conf1", TEST_PATH().resolve("configsets").resolve("cloud-dynamic").resolve("conf"))
|
||||
.configure();
|
||||
}
|
||||
|
||||
protected String getSolrXml() {
|
||||
return "solr.xml";
|
||||
}
|
||||
|
||||
@Test
|
||||
public void test() throws Exception {
|
||||
cluster.waitForAllNodes(5000);
|
||||
CloudSolrClient cloudClient = cluster.getSolrClient();
|
||||
String coll = "deletenodetest_coll";
|
||||
Set<String> liveNodes = cloudClient.getZkStateReader().getClusterState().getLiveNodes();
|
||||
ArrayList<String> l = new ArrayList<>(liveNodes);
|
||||
Collections.shuffle(l, random());
|
||||
CollectionAdminRequest.Create create = CollectionAdminRequest.createCollection(coll, "conf1", 5, 2);
|
||||
create.setCreateNodeSet(StrUtils.join(l, ',')).setMaxShardsPerNode(3);
|
||||
cloudClient.request(create);
|
||||
String node2bdecommissioned = l.get(0);
|
||||
new CollectionAdminRequest.DeleteNode(node2bdecommissioned).processAsync("003", cloudClient);
|
||||
CollectionAdminRequest.RequestStatus requestStatus = CollectionAdminRequest.requestStatus("003");
|
||||
boolean success = false;
|
||||
for (int i = 0; i < 200; i++) {
|
||||
CollectionAdminRequest.RequestStatusResponse rsp = requestStatus.process(cloudClient);
|
||||
if (rsp.getRequestStatus() == RequestStatusState.COMPLETED) {
|
||||
success = true;
|
||||
break;
|
||||
}
|
||||
assertFalse(rsp.getRequestStatus() == RequestStatusState.FAILED);
|
||||
Thread.sleep(50);
|
||||
}
|
||||
assertTrue(success);
|
||||
}
|
||||
}
|
|
@ -0,0 +1,104 @@
|
|||
/*
|
||||
* Licensed to the Apache Software Foundation (ASF) under one or more
|
||||
* contributor license agreements. See the NOTICE file distributed with
|
||||
* this work for additional information regarding copyright ownership.
|
||||
* The ASF licenses this file to You under the Apache License, Version 2.0
|
||||
* (the "License"); you may not use this file except in compliance with
|
||||
* the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
package org.apache.solr.cloud;
|
||||
|
||||
|
||||
import java.lang.invoke.MethodHandles;
|
||||
import java.util.ArrayList;
|
||||
import java.util.Collections;
|
||||
import java.util.Set;
|
||||
|
||||
import org.apache.solr.client.solrj.impl.CloudSolrClient;
|
||||
import org.apache.solr.client.solrj.impl.HttpSolrClient;
|
||||
import org.apache.solr.client.solrj.request.CollectionAdminRequest;
|
||||
import org.apache.solr.client.solrj.request.CoreAdminRequest;
|
||||
import org.apache.solr.client.solrj.response.CoreAdminResponse;
|
||||
import org.apache.solr.client.solrj.response.RequestStatusState;
|
||||
import org.apache.solr.common.util.StrUtils;
|
||||
import org.junit.BeforeClass;
|
||||
import org.junit.Test;
|
||||
import org.slf4j.Logger;
|
||||
import org.slf4j.LoggerFactory;
|
||||
|
||||
public class ReplaceNodeTest extends SolrCloudTestCase {
|
||||
private static final Logger log = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
|
||||
@BeforeClass
|
||||
public static void setupCluster() throws Exception {
|
||||
configureCluster(6)
|
||||
.addConfig("conf1", TEST_PATH().resolve("configsets").resolve("cloud-dynamic").resolve("conf"))
|
||||
.configure();
|
||||
}
|
||||
|
||||
protected String getSolrXml() {
|
||||
return "solr.xml";
|
||||
}
|
||||
|
||||
@Test
|
||||
public void test() throws Exception {
|
||||
cluster.waitForAllNodes(5000);
|
||||
String coll = "replacenodetest_coll";
|
||||
log.info("total_jettys: " + cluster.getJettySolrRunners().size());
|
||||
|
||||
CloudSolrClient cloudClient = cluster.getSolrClient();
|
||||
Set<String> liveNodes = cloudClient.getZkStateReader().getClusterState().getLiveNodes();
|
||||
ArrayList<String> l = new ArrayList<>(liveNodes);
|
||||
Collections.shuffle(l, random());
|
||||
String emptyNode = l.remove(0);
|
||||
String node2bdecommissioned = l.get(0);
|
||||
CollectionAdminRequest.Create create = CollectionAdminRequest.createCollection(coll, "conf1", 5, 2);
|
||||
create.setCreateNodeSet(StrUtils.join(l, ',')).setMaxShardsPerNode(3);
|
||||
cloudClient.request(create);
|
||||
log.info("excluded_node : {} ", emptyNode);
|
||||
new CollectionAdminRequest.ReplaceNode(node2bdecommissioned, emptyNode).processAsync("000", cloudClient);
|
||||
CollectionAdminRequest.RequestStatus requestStatus = CollectionAdminRequest.requestStatus("000");
|
||||
boolean success = false;
|
||||
for (int i = 0; i < 200; i++) {
|
||||
CollectionAdminRequest.RequestStatusResponse rsp = requestStatus.process(cloudClient);
|
||||
if (rsp.getRequestStatus() == RequestStatusState.COMPLETED) {
|
||||
success = true;
|
||||
break;
|
||||
}
|
||||
assertFalse(rsp.getRequestStatus() == RequestStatusState.FAILED);
|
||||
Thread.sleep(50);
|
||||
}
|
||||
assertTrue(success);
|
||||
try (HttpSolrClient coreclient = getHttpSolrClient(cloudClient.getZkStateReader().getBaseUrlForNodeName(node2bdecommissioned))) {
|
||||
CoreAdminResponse status = CoreAdminRequest.getStatus(null, coreclient);
|
||||
assertTrue(status.getCoreStatus().size() == 0);
|
||||
}
|
||||
|
||||
//let's do it back
|
||||
new CollectionAdminRequest.ReplaceNode(emptyNode, node2bdecommissioned).setParallel(Boolean.TRUE).processAsync("001", cloudClient);
|
||||
requestStatus = CollectionAdminRequest.requestStatus("001");
|
||||
|
||||
for (int i = 0; i < 200; i++) {
|
||||
CollectionAdminRequest.RequestStatusResponse rsp = requestStatus.process(cloudClient);
|
||||
if (rsp.getRequestStatus() == RequestStatusState.COMPLETED) {
|
||||
success = true;
|
||||
break;
|
||||
}
|
||||
assertFalse(rsp.getRequestStatus() == RequestStatusState.FAILED);
|
||||
Thread.sleep(50);
|
||||
}
|
||||
assertTrue(success);
|
||||
try (HttpSolrClient coreclient = getHttpSolrClient(cloudClient.getZkStateReader().getBaseUrlForNodeName(emptyNode))) {
|
||||
CoreAdminResponse status = CoreAdminRequest.getStatus(null, coreclient);
|
||||
assertTrue(status.getCoreStatus().size() == 0);
|
||||
}
|
||||
}
|
||||
}
|
|
@ -112,7 +112,7 @@ public abstract class CollectionAdminRequest<T extends CollectionAdminResponse>
|
|||
* @deprecated Use {@link #processAsync(String, SolrClient)} or {@link #processAsync(SolrClient)}
|
||||
*/
|
||||
@Deprecated
|
||||
public abstract AsyncCollectionAdminRequest setAsyncId(String id);
|
||||
public AsyncCollectionAdminRequest setAsyncId(String id){return this;};
|
||||
|
||||
/**
|
||||
* Process this request asynchronously, generating and returning a request id
|
||||
|
@ -491,6 +491,56 @@ public abstract class CollectionAdminRequest<T extends CollectionAdminResponse>
|
|||
}
|
||||
}
|
||||
|
||||
public static class DeleteNode extends AsyncCollectionAdminRequest {
|
||||
String node;
|
||||
|
||||
/**
|
||||
* @param node The node to be deleted
|
||||
*/
|
||||
public DeleteNode(String node) {
|
||||
super(CollectionAction.DELETENODE);
|
||||
this.node = node;
|
||||
}
|
||||
@Override
|
||||
public SolrParams getParams() {
|
||||
ModifiableSolrParams params = (ModifiableSolrParams) super.getParams();
|
||||
params.set("node", node);
|
||||
return params;
|
||||
}
|
||||
|
||||
|
||||
}
|
||||
|
||||
public static class ReplaceNode extends AsyncCollectionAdminRequest {
|
||||
String source, target;
|
||||
Boolean parallel;
|
||||
|
||||
/**
|
||||
* @param source node to be cleaned up
|
||||
* @param target node where the new replicas are to be created
|
||||
*/
|
||||
public ReplaceNode(String source, String target) {
|
||||
super(CollectionAction.REPLACENODE);
|
||||
this.source = source;
|
||||
this.target = target;
|
||||
}
|
||||
|
||||
public ReplaceNode setParallel(Boolean flag) {
|
||||
this.parallel = flag;
|
||||
return this;
|
||||
}
|
||||
|
||||
@Override
|
||||
public SolrParams getParams() {
|
||||
ModifiableSolrParams params = (ModifiableSolrParams) super.getParams();
|
||||
params.set("source", source);
|
||||
params.set("target", target);
|
||||
if (parallel != null) params.set("parallel", parallel.toString());
|
||||
return params;
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
/*
|
||||
* Returns a RebalanceLeaders object to rebalance leaders for a collection
|
||||
*/
|
||||
|
|
|
@ -20,6 +20,7 @@ import org.apache.solr.common.util.Utils;
|
|||
import org.noggit.JSONUtil;
|
||||
import org.noggit.JSONWriter;
|
||||
|
||||
import java.util.Collections;
|
||||
import java.util.LinkedHashMap;
|
||||
import java.util.Map;
|
||||
import java.util.Set;
|
||||
|
@ -40,6 +41,17 @@ public class ZkNodeProps implements JSONWriter.Writable {
|
|||
// Always wrapping introduces a memory leak.
|
||||
}
|
||||
|
||||
public ZkNodeProps plus(String key , Object val) {
|
||||
return plus(Collections.singletonMap(key,val));
|
||||
}
|
||||
|
||||
public ZkNodeProps plus(Map<String, Object> newVals) {
|
||||
LinkedHashMap<String, Object> copy = new LinkedHashMap<>(propMap);
|
||||
if (newVals == null || newVals.isEmpty()) return new ZkNodeProps(copy);
|
||||
copy.putAll(newVals);
|
||||
return new ZkNodeProps(copy);
|
||||
}
|
||||
|
||||
|
||||
/**
|
||||
* Constructor that populates the from array of Strings in form key1, value1,
|
||||
|
|
|
@ -96,6 +96,9 @@ public interface CollectionParams {
|
|||
// but the overseer is aware of these tasks
|
||||
MOCK_COLL_TASK(false, LockLevel.COLLECTION),
|
||||
MOCK_SHARD_TASK(false, LockLevel.SHARD),
|
||||
//TODO when we have a node level lock use it here
|
||||
REPLACENODE(true, LockLevel.NONE),
|
||||
DELETENODE(true, LockLevel.NONE),
|
||||
MOCK_REPLICA_TASK(false, LockLevel.REPLICA)
|
||||
;
|
||||
public final boolean isWrite;
|
||||
|
|
Loading…
Reference in New Issue