From d84977eb5cde00f0e92f71837bdf9cee25e0b54a Mon Sep 17 00:00:00 2001
From: Ishan Chattopadhyaya
Date: Sat, 29 Aug 2020 02:13:13 +0530
Subject: [PATCH] SOLR-14616: Remove CDCR
---
solr/CHANGES.txt | 2 +
.../apache/solr/cloud/RecoveryStrategy.java | 7 -
.../solr/handler/CdcrBufferManager.java | 71 --
.../solr/handler/CdcrBufferStateManager.java | 178 ----
.../solr/handler/CdcrLeaderStateManager.java | 162 ----
.../org/apache/solr/handler/CdcrParams.java | 256 -----
.../solr/handler/CdcrProcessStateManager.java | 178 ----
.../apache/solr/handler/CdcrReplicator.java | 258 -----
.../solr/handler/CdcrReplicatorManager.java | 441 ---------
.../solr/handler/CdcrReplicatorScheduler.java | 116 ---
.../solr/handler/CdcrReplicatorState.java | 299 ------
.../solr/handler/CdcrRequestHandler.java | 880 -----------------
.../apache/solr/handler/CdcrStateManager.java | 47 -
.../handler/CdcrUpdateLogSynchronizer.java | 192 ----
.../org/apache/solr/handler/IndexFetcher.java | 124 +--
.../solr/handler/ReplicationHandler.java | 39 -
.../component/RealTimeGetComponent.java | 15 +-
.../solr/update/CdcrTransactionLog.java | 401 --------
.../org/apache/solr/update/CdcrUpdateLog.java | 796 ---------------
.../solr/update/DefaultSolrCoreState.java | 41 -
.../org/apache/solr/update/SolrCoreState.java | 17 -
.../org/apache/solr/update/UpdateLog.java | 3 +-
.../update/processor/CdcrUpdateProcessor.java | 132 ---
.../processor/CdcrUpdateProcessorFactory.java | 46 -
.../solr/collection1/conf/solrconfig-cdcr.xml | 77 --
.../conf/solrconfig-cdcrupdatelog.xml | 49 -
.../cdcr-cluster1/conf/managed-schema | 29 -
.../cdcr-cluster1/conf/solrconfig.xml | 80 --
.../cdcr-cluster2/conf/managed-schema | 29 -
.../cdcr-cluster2/conf/solrconfig.xml | 80 --
.../cdcr-source-disabled/conf/schema.xml | 29 -
.../cdcr-source-disabled/conf/solrconfig.xml | 60 --
.../configsets/cdcr-source/conf/schema.xml | 29 -
.../cdcr-source/conf/solrconfig.xml | 75 --
.../configsets/cdcr-target/conf/schema.xml | 29 -
.../cdcr-target/conf/solrconfig.xml | 62 --
.../cloud/cdcr/BaseCdcrDistributedZkTest.java | 906 ------------------
.../cloud/cdcr/CdcrBidirectionalTest.java | 244 -----
.../solr/cloud/cdcr/CdcrBootstrapTest.java | 373 -------
.../cloud/cdcr/CdcrOpsAndBoundariesTest.java | 332 -------
.../cdcr/CdcrReplicationHandlerTest.java | 332 -------
.../cloud/cdcr/CdcrRequestHandlerTest.java | 183 ----
.../apache/solr/cloud/cdcr/CdcrTestsUtil.java | 274 ------
.../cdcr/CdcrVersionReplicationTest.java | 307 ------
.../cloud/cdcr/CdcrWithNodesRestartsTest.java | 359 -------
.../apache/solr/search/TestRealTimeGet.java | 1 -
.../org/apache/solr/search/TestRecovery.java | 1 -
.../solr/search/TestStressRecovery.java | 1 -
.../apache/solr/update/CdcrUpdateLogTest.java | 783 ---------------
.../update/TestInPlaceUpdatesDistrib.java | 2 -
solr/solr-ref-guide/src/aliases.adoc | 2 -
solr/solr-ref-guide/src/cdcr-api.adoc | 321 -------
.../solr-ref-guide/src/cdcr-architecture.adoc | 167 ----
solr/solr-ref-guide/src/cdcr-config.adoc | 376 --------
solr/solr-ref-guide/src/cdcr-operations.adoc | 49 -
.../cross-data-center-replication-cdcr.adoc | 63 --
.../major-changes-from-solr-5-to-solr-6.adoc | 2 +-
solr/solr-ref-guide/src/solrcloud.adoc | 5 +-
.../java/org/apache/solr/SolrTestCaseJ4.java | 14 -
59 files changed, 11 insertions(+), 10415 deletions(-)
delete mode 100644 solr/core/src/java/org/apache/solr/handler/CdcrBufferManager.java
delete mode 100644 solr/core/src/java/org/apache/solr/handler/CdcrBufferStateManager.java
delete mode 100644 solr/core/src/java/org/apache/solr/handler/CdcrLeaderStateManager.java
delete mode 100644 solr/core/src/java/org/apache/solr/handler/CdcrParams.java
delete mode 100644 solr/core/src/java/org/apache/solr/handler/CdcrProcessStateManager.java
delete mode 100644 solr/core/src/java/org/apache/solr/handler/CdcrReplicator.java
delete mode 100644 solr/core/src/java/org/apache/solr/handler/CdcrReplicatorManager.java
delete mode 100644 solr/core/src/java/org/apache/solr/handler/CdcrReplicatorScheduler.java
delete mode 100644 solr/core/src/java/org/apache/solr/handler/CdcrReplicatorState.java
delete mode 100644 solr/core/src/java/org/apache/solr/handler/CdcrRequestHandler.java
delete mode 100644 solr/core/src/java/org/apache/solr/handler/CdcrStateManager.java
delete mode 100644 solr/core/src/java/org/apache/solr/handler/CdcrUpdateLogSynchronizer.java
delete mode 100644 solr/core/src/java/org/apache/solr/update/CdcrTransactionLog.java
delete mode 100644 solr/core/src/java/org/apache/solr/update/CdcrUpdateLog.java
delete mode 100644 solr/core/src/java/org/apache/solr/update/processor/CdcrUpdateProcessor.java
delete mode 100644 solr/core/src/java/org/apache/solr/update/processor/CdcrUpdateProcessorFactory.java
delete mode 100644 solr/core/src/test-files/solr/collection1/conf/solrconfig-cdcr.xml
delete mode 100644 solr/core/src/test-files/solr/collection1/conf/solrconfig-cdcrupdatelog.xml
delete mode 100644 solr/core/src/test-files/solr/configsets/cdcr-cluster1/conf/managed-schema
delete mode 100644 solr/core/src/test-files/solr/configsets/cdcr-cluster1/conf/solrconfig.xml
delete mode 100644 solr/core/src/test-files/solr/configsets/cdcr-cluster2/conf/managed-schema
delete mode 100644 solr/core/src/test-files/solr/configsets/cdcr-cluster2/conf/solrconfig.xml
delete mode 100644 solr/core/src/test-files/solr/configsets/cdcr-source-disabled/conf/schema.xml
delete mode 100644 solr/core/src/test-files/solr/configsets/cdcr-source-disabled/conf/solrconfig.xml
delete mode 100644 solr/core/src/test-files/solr/configsets/cdcr-source/conf/schema.xml
delete mode 100644 solr/core/src/test-files/solr/configsets/cdcr-source/conf/solrconfig.xml
delete mode 100644 solr/core/src/test-files/solr/configsets/cdcr-target/conf/schema.xml
delete mode 100644 solr/core/src/test-files/solr/configsets/cdcr-target/conf/solrconfig.xml
delete mode 100644 solr/core/src/test/org/apache/solr/cloud/cdcr/BaseCdcrDistributedZkTest.java
delete mode 100644 solr/core/src/test/org/apache/solr/cloud/cdcr/CdcrBidirectionalTest.java
delete mode 100644 solr/core/src/test/org/apache/solr/cloud/cdcr/CdcrBootstrapTest.java
delete mode 100644 solr/core/src/test/org/apache/solr/cloud/cdcr/CdcrOpsAndBoundariesTest.java
delete mode 100644 solr/core/src/test/org/apache/solr/cloud/cdcr/CdcrReplicationHandlerTest.java
delete mode 100644 solr/core/src/test/org/apache/solr/cloud/cdcr/CdcrRequestHandlerTest.java
delete mode 100644 solr/core/src/test/org/apache/solr/cloud/cdcr/CdcrTestsUtil.java
delete mode 100644 solr/core/src/test/org/apache/solr/cloud/cdcr/CdcrVersionReplicationTest.java
delete mode 100644 solr/core/src/test/org/apache/solr/cloud/cdcr/CdcrWithNodesRestartsTest.java
delete mode 100644 solr/core/src/test/org/apache/solr/update/CdcrUpdateLogTest.java
delete mode 100644 solr/solr-ref-guide/src/cdcr-api.adoc
delete mode 100644 solr/solr-ref-guide/src/cdcr-architecture.adoc
delete mode 100644 solr/solr-ref-guide/src/cdcr-config.adoc
delete mode 100644 solr/solr-ref-guide/src/cdcr-operations.adoc
delete mode 100644 solr/solr-ref-guide/src/cross-data-center-replication-cdcr.adoc
diff --git a/solr/CHANGES.txt b/solr/CHANGES.txt
index 018d8930a40..d376597aef8 100644
--- a/solr/CHANGES.txt
+++ b/solr/CHANGES.txt
@@ -48,6 +48,8 @@ Other Changes
----------------------
* SOLR-14656: Autoscaling framework removed (Ishan Chattopadhyaya, noble, Ilan Ginzburg)
+* SOLR-14616: CDCR support removed (Ishan Chattopadhyaya)
+
* LUCENE-9391: Upgrade HPPC to 0.8.2. (Haoyu Zhai)
* SOLR-10288: Remove non-minified JavaScript from the webapp. (Erik Hatcher, marcussorealheis)
diff --git a/solr/core/src/java/org/apache/solr/cloud/RecoveryStrategy.java b/solr/core/src/java/org/apache/solr/cloud/RecoveryStrategy.java
index c2db3f4d30d..a3e2f7e3ba8 100644
--- a/solr/core/src/java/org/apache/solr/cloud/RecoveryStrategy.java
+++ b/solr/core/src/java/org/apache/solr/cloud/RecoveryStrategy.java
@@ -58,7 +58,6 @@ import org.apache.solr.request.LocalSolrQueryRequest;
import org.apache.solr.request.SolrQueryRequest;
import org.apache.solr.request.SolrRequestHandler;
import org.apache.solr.search.SolrIndexSearcher;
-import org.apache.solr.update.CdcrUpdateLog;
import org.apache.solr.update.CommitUpdateCommand;
import org.apache.solr.update.PeerSyncWithLeader;
import org.apache.solr.update.UpdateLog;
@@ -241,12 +240,6 @@ public class RecoveryStrategy implements Runnable, Closeable {
ModifiableSolrParams solrParams = new ModifiableSolrParams();
solrParams.set(ReplicationHandler.LEADER_URL, leaderUrl);
solrParams.set(ReplicationHandler.SKIP_COMMIT_ON_LEADER_VERSION_ZERO, replicaType == Replica.Type.TLOG);
- // always download the tlogs from the leader when running with cdcr enabled. We need to have all the tlogs
- // to ensure leader failover doesn't cause missing docs on the target
- if (core.getUpdateHandler().getUpdateLog() != null
- && core.getUpdateHandler().getUpdateLog() instanceof CdcrUpdateLog) {
- solrParams.set(ReplicationHandler.TLOG_FILES, true);
- }
if (isClosed()) return; // we check closed on return
boolean success = replicationHandler.doFetch(solrParams, false).getSuccessful();
diff --git a/solr/core/src/java/org/apache/solr/handler/CdcrBufferManager.java b/solr/core/src/java/org/apache/solr/handler/CdcrBufferManager.java
deleted file mode 100644
index 86963796c25..00000000000
--- a/solr/core/src/java/org/apache/solr/handler/CdcrBufferManager.java
+++ /dev/null
@@ -1,71 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.solr.handler;
-
-import org.apache.solr.core.SolrCore;
-import org.apache.solr.update.CdcrUpdateLog;
-
-/**
- * This manager is responsible in enabling or disabling the buffering of the update logs. Currently, buffer
- * is always activated for non-leader nodes. For leader nodes, it is enabled only if the user explicitly
- * enabled it with the action {@link org.apache.solr.handler.CdcrParams.CdcrAction#ENABLEBUFFER}.
- */
-class CdcrBufferManager implements CdcrStateManager.CdcrStateObserver {
-
- private CdcrLeaderStateManager leaderStateManager;
- private CdcrBufferStateManager bufferStateManager;
-
- private final SolrCore core;
-
- CdcrBufferManager(SolrCore core) {
- this.core = core;
- }
-
- void setLeaderStateManager(final CdcrLeaderStateManager leaderStateManager) {
- this.leaderStateManager = leaderStateManager;
- this.leaderStateManager.register(this);
- }
-
- void setBufferStateManager(final CdcrBufferStateManager bufferStateManager) {
- this.bufferStateManager = bufferStateManager;
- this.bufferStateManager.register(this);
- }
-
- /**
- * This method is synchronised as it can both be called by the leaderStateManager and the bufferStateManager.
- */
- @Override
- public synchronized void stateUpdate() {
- CdcrUpdateLog ulog = (CdcrUpdateLog) core.getUpdateHandler().getUpdateLog();
-
- // If I am not the leader, I should always buffer my updates
- if (!leaderStateManager.amILeader()) {
- ulog.enableBuffer();
- return;
- }
- // If I am the leader, I should buffer my updates only if buffer is enabled
- else if (bufferStateManager.getState().equals(CdcrParams.BufferState.ENABLED)) {
- ulog.enableBuffer();
- return;
- }
-
- // otherwise, disable the buffer
- ulog.disableBuffer();
- }
-
-}
-
diff --git a/solr/core/src/java/org/apache/solr/handler/CdcrBufferStateManager.java b/solr/core/src/java/org/apache/solr/handler/CdcrBufferStateManager.java
deleted file mode 100644
index 49d19f1ad22..00000000000
--- a/solr/core/src/java/org/apache/solr/handler/CdcrBufferStateManager.java
+++ /dev/null
@@ -1,178 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.solr.handler;
-
-import org.apache.solr.common.cloud.SolrZkClient;
-import org.apache.solr.common.params.SolrParams;
-import org.apache.solr.core.SolrCore;
-import org.apache.zookeeper.CreateMode;
-import org.apache.zookeeper.KeeperException;
-import org.apache.zookeeper.WatchedEvent;
-import org.apache.zookeeper.Watcher;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import java.lang.invoke.MethodHandles;
-import java.nio.charset.Charset;
-
-/**
- * Manage the state of the update log buffer. It is responsible of synchronising the state
- * through Zookeeper. The state of the buffer is stored in the zk node defined by {@link #getZnodePath()}.
- * @deprecated since 8.6
- */
-@Deprecated(since = "8.6")
-class CdcrBufferStateManager extends CdcrStateManager {
-
- private CdcrParams.BufferState state = DEFAULT_STATE;
-
- private BufferStateWatcher wrappedWatcher;
- private Watcher watcher;
-
- private SolrCore core;
-
- static CdcrParams.BufferState DEFAULT_STATE = CdcrParams.BufferState.ENABLED;
-
- private static final Logger log = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
-
- CdcrBufferStateManager(final SolrCore core, SolrParams bufferConfiguration) {
- this.core = core;
-
- // Ensure that the state znode exists
- this.createStateNode();
-
- // set default state
- if (bufferConfiguration != null) {
- byte[] defaultState = bufferConfiguration.get(
- CdcrParams.DEFAULT_STATE_PARAM, DEFAULT_STATE.toLower()).getBytes(Charset.forName("UTF-8"));
- state = CdcrParams.BufferState.get(defaultState);
- }
- this.setState(state); // notify observers
-
- // Startup and register the watcher at startup
- try {
- SolrZkClient zkClient = core.getCoreContainer().getZkController().getZkClient();
- watcher = this.initWatcher(zkClient);
- this.setState(CdcrParams.BufferState.get(zkClient.getData(this.getZnodePath(), watcher, null, true)));
- } catch (KeeperException | InterruptedException e) {
- log.warn("Failed fetching initial state", e);
- }
- }
-
- /**
- * SolrZkClient does not guarantee that a watch object will only be triggered once for a given notification
- * if we does not wrap the watcher - see SOLR-6621.
- */
- private Watcher initWatcher(SolrZkClient zkClient) {
- wrappedWatcher = new BufferStateWatcher();
- return zkClient.wrapWatcher(wrappedWatcher);
- }
-
- private String getZnodeBase() {
- return "/collections/" + core.getCoreDescriptor().getCloudDescriptor().getCollectionName() + "/cdcr/state";
- }
-
- private String getZnodePath() {
- return getZnodeBase() + "/buffer";
- }
-
- void setState(CdcrParams.BufferState state) {
- if (this.state != state) {
- this.state = state;
- this.callback(); // notify the observers of a state change
- }
- }
-
- CdcrParams.BufferState getState() {
- return state;
- }
-
- /**
- * Synchronise the state to Zookeeper. This method must be called only by the handler receiving the
- * action.
- */
- void synchronize() {
- SolrZkClient zkClient = core.getCoreContainer().getZkController().getZkClient();
- try {
- zkClient.setData(this.getZnodePath(), this.getState().getBytes(), true);
- // check if nobody changed it in the meantime, and set a new watcher
- this.setState(CdcrParams.BufferState.get(zkClient.getData(this.getZnodePath(), watcher, null, true)));
- } catch (KeeperException | InterruptedException e) {
- log.warn("Failed synchronising new state", e);
- }
- }
-
- private void createStateNode() {
- SolrZkClient zkClient = core.getCoreContainer().getZkController().getZkClient();
- try {
- if (!zkClient.exists(this.getZnodePath(), true)) {
- if (!zkClient.exists(this.getZnodeBase(), true)) {
- zkClient.makePath(this.getZnodeBase(), null, CreateMode.PERSISTENT, null, false, true); // Should be a no-op if node exists
- }
- zkClient.create(this.getZnodePath(), DEFAULT_STATE.getBytes(), CreateMode.PERSISTENT, true);
- if (log.isInfoEnabled()) {
- log.info("Created znode {}", this.getZnodePath());
- }
- }
- } catch (KeeperException.NodeExistsException ne) {
- // Someone got in first and created the node.
- } catch (KeeperException | InterruptedException e) {
- log.warn("Failed to create CDCR buffer state node", e);
- }
- }
-
- void shutdown() {
- if (wrappedWatcher != null) {
- wrappedWatcher.cancel(); // cancel the watcher to avoid spurious warn messages during shutdown
- }
- }
-
- private class BufferStateWatcher implements Watcher {
-
- private boolean isCancelled = false;
-
- /**
- * Cancel the watcher to avoid spurious warn messages during shutdown.
- */
- void cancel() {
- isCancelled = true;
- }
-
- @Override
- public void process(WatchedEvent event) {
- if (isCancelled) return; // if the watcher is cancelled, do nothing.
- String collectionName = core.getCoreDescriptor().getCloudDescriptor().getCollectionName();
- String shard = core.getCoreDescriptor().getCloudDescriptor().getShardId();
-
- log.info("The CDCR buffer state has changed: {} @ {}:{}", event, collectionName, shard);
- // session events are not change events, and do not remove the watcher
- if (Event.EventType.None.equals(event.getType())) {
- return;
- }
- SolrZkClient zkClient = core.getCoreContainer().getZkController().getZkClient();
- try {
- CdcrParams.BufferState state = CdcrParams.BufferState.get(zkClient.getData(CdcrBufferStateManager.this.getZnodePath(), watcher, null, true));
- log.info("Received new CDCR buffer state from watcher: {} @ {}:{}", state, collectionName, shard);
- CdcrBufferStateManager.this.setState(state);
- } catch (KeeperException | InterruptedException e) {
- log.warn("Failed synchronising new state @ {}:{}", collectionName, shard, e);
- }
- }
-
- }
-
-}
-
diff --git a/solr/core/src/java/org/apache/solr/handler/CdcrLeaderStateManager.java b/solr/core/src/java/org/apache/solr/handler/CdcrLeaderStateManager.java
deleted file mode 100644
index c9bc5fdad3e..00000000000
--- a/solr/core/src/java/org/apache/solr/handler/CdcrLeaderStateManager.java
+++ /dev/null
@@ -1,162 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.solr.handler;
-
-import java.lang.invoke.MethodHandles;
-
-import org.apache.solr.common.cloud.ClusterState;
-import org.apache.solr.common.cloud.SolrZkClient;
-import org.apache.solr.common.cloud.ZkNodeProps;
-import org.apache.solr.core.SolrCore;
-import org.apache.zookeeper.KeeperException;
-import org.apache.zookeeper.WatchedEvent;
-import org.apache.zookeeper.Watcher;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-/**
- *
- * Manage the leader state of the CDCR nodes.
- *
- *
- * It takes care of notifying the {@link CdcrReplicatorManager} in case
- * of a leader state change.
- *
- * @deprecated since 8.6
- */
-@Deprecated(since = "8.6")
-class CdcrLeaderStateManager extends CdcrStateManager {
-
- private boolean amILeader = false;
-
- private LeaderStateWatcher wrappedWatcher;
- private Watcher watcher;
-
- private SolrCore core;
-
- private static final Logger log = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
-
- CdcrLeaderStateManager(final SolrCore core) {
- this.core = core;
-
- // Fetch leader state and register the watcher at startup
- try {
- SolrZkClient zkClient = core.getCoreContainer().getZkController().getZkClient();
- ClusterState clusterState = core.getCoreContainer().getZkController().getClusterState();
-
- watcher = this.initWatcher(zkClient);
- // if the node does not exist, it means that the leader was not yet registered. This can happen
- // when the cluster is starting up. The core is not yet fully loaded, and the leader election process
- // is waiting for it.
- if (this.isLeaderRegistered(zkClient, clusterState)) {
- this.checkIfIAmLeader();
- }
- } catch (KeeperException | InterruptedException e) {
- log.warn("Failed fetching initial leader state and setting watch", e);
- }
- }
-
- /**
- * Checks if the leader is registered. If it is not registered, we are probably at the
- * initialisation phase of the cluster. In this case, we must attach a watcher to
- * be notified when the leader is registered.
- */
- private boolean isLeaderRegistered(SolrZkClient zkClient, ClusterState clusterState)
- throws KeeperException, InterruptedException {
- // First check if the znode exists, and register the watcher at the same time
- return zkClient.exists(this.getZnodePath(), watcher, true) != null;
- }
-
- /**
- * SolrZkClient does not guarantee that a watch object will only be triggered once for a given notification
- * if we does not wrap the watcher - see SOLR-6621.
- */
- private Watcher initWatcher(SolrZkClient zkClient) {
- wrappedWatcher = new LeaderStateWatcher();
- return zkClient.wrapWatcher(wrappedWatcher);
- }
-
- private void checkIfIAmLeader() throws KeeperException, InterruptedException {
- SolrZkClient zkClient = core.getCoreContainer().getZkController().getZkClient();
- ZkNodeProps props = ZkNodeProps.load(zkClient.getData(CdcrLeaderStateManager.this.getZnodePath(), null, null, true));
- if (props != null) {
- CdcrLeaderStateManager.this.setAmILeader(props.get("core").equals(core.getName()));
- }
- }
-
- private String getZnodePath() {
- String myShardId = core.getCoreDescriptor().getCloudDescriptor().getShardId();
- String myCollection = core.getCoreDescriptor().getCloudDescriptor().getCollectionName();
- return "/collections/" + myCollection + "/leaders/" + myShardId + "/leader";
- }
-
- void setAmILeader(boolean amILeader) {
- if (this.amILeader != amILeader) {
- this.amILeader = amILeader;
- this.callback(); // notify the observers of a state change
- }
- }
-
- boolean amILeader() {
- return amILeader;
- }
-
- void shutdown() {
- if (wrappedWatcher != null) {
- wrappedWatcher.cancel(); // cancel the watcher to avoid spurious warn messages during shutdown
- }
- }
-
- private class LeaderStateWatcher implements Watcher {
-
- private boolean isCancelled = false;
-
- /**
- * Cancel the watcher to avoid spurious warn messages during shutdown.
- */
- void cancel() {
- isCancelled = true;
- }
-
- @Override
- public void process(WatchedEvent event) {
- if (isCancelled) return; // if the watcher is cancelled, do nothing.
- String collectionName = core.getCoreDescriptor().getCloudDescriptor().getCollectionName();
- String shard = core.getCoreDescriptor().getCloudDescriptor().getShardId();
-
- log.debug("The leader state has changed: {} @ {}:{}", event, collectionName, shard);
- // session events are not change events, and do not remove the watcher
- if (Event.EventType.None.equals(event.getType())) {
- return;
- }
-
- try {
- log.info("Received new leader state @ {}:{}", collectionName, shard);
- SolrZkClient zkClient = core.getCoreContainer().getZkController().getZkClient();
- ClusterState clusterState = core.getCoreContainer().getZkController().getClusterState();
- if (CdcrLeaderStateManager.this.isLeaderRegistered(zkClient, clusterState)) {
- CdcrLeaderStateManager.this.checkIfIAmLeader();
- }
- } catch (KeeperException | InterruptedException e) {
- log.warn("Failed updating leader state and setting watch @ {}: {}", collectionName, shard, e);
- }
- }
-
- }
-
-}
-
diff --git a/solr/core/src/java/org/apache/solr/handler/CdcrParams.java b/solr/core/src/java/org/apache/solr/handler/CdcrParams.java
deleted file mode 100644
index 3f65b90585f..00000000000
--- a/solr/core/src/java/org/apache/solr/handler/CdcrParams.java
+++ /dev/null
@@ -1,256 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.solr.handler;
-
-import java.nio.charset.Charset;
-import java.util.Locale;
-
-public class CdcrParams {
-
- /**
- * The definition of a replica configuration *
- */
- public static final String REPLICA_PARAM = "replica";
-
- /**
- * The source collection of a replica *
- */
- public static final String SOURCE_COLLECTION_PARAM = "source";
-
- /**
- * The target collection of a replica *
- */
- public static final String TARGET_COLLECTION_PARAM = "target";
-
- /**
- * The Zookeeper host of the target cluster hosting the replica *
- */
- public static final String ZK_HOST_PARAM = "zkHost";
-
- /**
- * The definition of the {@link org.apache.solr.handler.CdcrReplicatorScheduler} configuration *
- */
- public static final String REPLICATOR_PARAM = "replicator";
-
- /**
- * The thread pool size of the replicator *
- */
- public static final String THREAD_POOL_SIZE_PARAM = "threadPoolSize";
-
- /**
- * The time schedule (in ms) of the replicator *
- */
- public static final String SCHEDULE_PARAM = "schedule";
-
- /**
- * The batch size of the replicator *
- */
- public static final String BATCH_SIZE_PARAM = "batchSize";
-
- /**
- * The definition of the {@link org.apache.solr.handler.CdcrUpdateLogSynchronizer} configuration *
- */
- public static final String UPDATE_LOG_SYNCHRONIZER_PARAM = "updateLogSynchronizer";
-
- /**
- * The definition of the {@link org.apache.solr.handler.CdcrBufferManager} configuration *
- */
- public static final String BUFFER_PARAM = "buffer";
-
- /**
- * The default state at startup of the buffer *
- */
- public static final String DEFAULT_STATE_PARAM = "defaultState";
-
- /**
- * The latest update checkpoint on a target cluster *
- */
- public final static String CHECKPOINT = "checkpoint";
-
- /**
- * The last processed version on a source cluster *
- */
- public final static String LAST_PROCESSED_VERSION = "lastProcessedVersion";
-
- /**
- * A list of replica queues on a source cluster *
- */
- public final static String QUEUES = "queues";
-
- /**
- * The size of a replica queue on a source cluster *
- */
- public final static String QUEUE_SIZE = "queueSize";
-
- /**
- * The timestamp of the last processed operation in a replica queue *
- */
- public final static String LAST_TIMESTAMP = "lastTimestamp";
-
- /**
- * A list of qps statistics per collection *
- */
- public final static String OPERATIONS_PER_SECOND = "operationsPerSecond";
-
- /**
- * Overall counter *
- */
- public final static String COUNTER_ALL = "all";
-
- /**
- * Counter for Adds *
- */
- public final static String COUNTER_ADDS = "adds";
-
- /**
- * Counter for Deletes *
- */
- public final static String COUNTER_DELETES = "deletes";
-
- /**
- * Counter for Bootstrap operations *
- */
- public final static String COUNTER_BOOTSTRAP = "bootstraps";
-
- /**
- * A list of errors per target collection *
- */
- public final static String ERRORS = "errors";
-
- /**
- * Counter for consecutive errors encountered by a replicator thread *
- */
- public final static String CONSECUTIVE_ERRORS = "consecutiveErrors";
-
- /**
- * A list of the last errors encountered by a replicator thread *
- */
- public final static String LAST = "last";
-
- /**
- * Total size of transaction logs *
- */
- public final static String TLOG_TOTAL_SIZE = "tlogTotalSize";
-
- /**
- * Total count of transaction logs *
- */
- public final static String TLOG_TOTAL_COUNT = "tlogTotalCount";
-
- /**
- * The state of the update log synchronizer *
- */
- public final static String UPDATE_LOG_SYNCHRONIZER = "updateLogSynchronizer";
-
- /**
- * The actions supported by the CDCR API
- */
- public enum CdcrAction {
- START,
- STOP,
- STATUS,
- COLLECTIONCHECKPOINT,
- SHARDCHECKPOINT,
- ENABLEBUFFER,
- DISABLEBUFFER,
- LASTPROCESSEDVERSION,
- QUEUES,
- OPS,
- ERRORS,
- BOOTSTRAP,
- BOOTSTRAP_STATUS,
- CANCEL_BOOTSTRAP;
-
- public static CdcrAction get(String p) {
- if (p != null) {
- try {
- return CdcrAction.valueOf(p.toUpperCase(Locale.ROOT));
- } catch (Exception e) {
- }
- }
- return null;
- }
-
- public String toLower() {
- return toString().toLowerCase(Locale.ROOT);
- }
-
- }
-
- /**
- * The possible states of the CDCR process
- */
- public enum ProcessState {
- STARTED,
- STOPPED;
-
- public static ProcessState get(byte[] state) {
- if (state != null) {
- try {
- return ProcessState.valueOf(new String(state, Charset.forName("UTF-8")).toUpperCase(Locale.ROOT));
- } catch (Exception e) {
- }
- }
- return null;
- }
-
- public String toLower() {
- return toString().toLowerCase(Locale.ROOT);
- }
-
- public byte[] getBytes() {
- return toLower().getBytes(Charset.forName("UTF-8"));
- }
-
- public static String getParam() {
- return "process";
- }
-
- }
-
- /**
- * The possible states of the CDCR buffer
- */
- public enum BufferState {
- ENABLED,
- DISABLED;
-
- public static BufferState get(byte[] state) {
- if (state != null) {
- try {
- return BufferState.valueOf(new String(state, Charset.forName("UTF-8")).toUpperCase(Locale.ROOT));
- } catch (Exception e) {
- }
- }
- return null;
- }
-
- public String toLower() {
- return toString().toLowerCase(Locale.ROOT);
- }
-
- public byte[] getBytes() {
- return toLower().getBytes(Charset.forName("UTF-8"));
- }
-
- public static String getParam() {
- return "buffer";
- }
-
- }
-}
-
diff --git a/solr/core/src/java/org/apache/solr/handler/CdcrProcessStateManager.java b/solr/core/src/java/org/apache/solr/handler/CdcrProcessStateManager.java
deleted file mode 100644
index 6506030bbce..00000000000
--- a/solr/core/src/java/org/apache/solr/handler/CdcrProcessStateManager.java
+++ /dev/null
@@ -1,178 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.solr.handler;
-
-import java.lang.invoke.MethodHandles;
-
-import org.apache.solr.common.cloud.SolrZkClient;
-import org.apache.solr.core.SolrCore;
-import org.apache.zookeeper.CreateMode;
-import org.apache.zookeeper.KeeperException;
-import org.apache.zookeeper.WatchedEvent;
-import org.apache.zookeeper.Watcher;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-/**
- *
- * Manage the life-cycle state of the CDCR process. It is responsible of synchronising the state
- * through Zookeeper. The state of the CDCR process is stored in the zk node defined by {@link #getZnodePath()}.
- *
- *
- * It takes care of notifying the {@link CdcrReplicatorManager} in case
- * of a process state change.
- *
- * @deprecated since 8.6
- */
-@Deprecated(since = "8.6")
-class CdcrProcessStateManager extends CdcrStateManager {
-
- private CdcrParams.ProcessState state = DEFAULT_STATE;
-
- private ProcessStateWatcher wrappedWatcher;
- private Watcher watcher;
-
- private SolrCore core;
-
- /**
- * The default state must be STOPPED. See comments in
- * {@link #setState(org.apache.solr.handler.CdcrParams.ProcessState)}.
- */
- static CdcrParams.ProcessState DEFAULT_STATE = CdcrParams.ProcessState.STOPPED;
-
- private static final Logger log = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
-
- CdcrProcessStateManager(final SolrCore core) {
- this.core = core;
-
- // Ensure that the status znode exists
- this.createStateNode();
-
- // Register the watcher at startup
- try {
- SolrZkClient zkClient = core.getCoreContainer().getZkController().getZkClient();
- watcher = this.initWatcher(zkClient);
- this.setState(CdcrParams.ProcessState.get(zkClient.getData(this.getZnodePath(), watcher, null, true)));
- } catch (KeeperException | InterruptedException e) {
- log.warn("Failed fetching initial state", e);
- }
- }
-
- /**
- * SolrZkClient does not guarantee that a watch object will only be triggered once for a given notification
- * if we does not wrap the watcher - see SOLR-6621.
- */
- private Watcher initWatcher(SolrZkClient zkClient) {
- wrappedWatcher = new ProcessStateWatcher();
- return zkClient.wrapWatcher(wrappedWatcher);
- }
-
- private String getZnodeBase() {
- return "/collections/" + core.getCoreDescriptor().getCloudDescriptor().getCollectionName() + "/cdcr/state";
- }
-
- private String getZnodePath() {
- return getZnodeBase() + "/process";
- }
-
- void setState(CdcrParams.ProcessState state) {
- if (this.state != state) {
- this.state = state;
- this.callback(); // notify the observers of a state change
- }
- }
-
- CdcrParams.ProcessState getState() {
- return state;
- }
-
- /**
- * Synchronise the state to Zookeeper. This method must be called only by the handler receiving the
- * action.
- */
- void synchronize() {
- SolrZkClient zkClient = core.getCoreContainer().getZkController().getZkClient();
- try {
- zkClient.setData(this.getZnodePath(), this.getState().getBytes(), true);
- // check if nobody changed it in the meantime, and set a new watcher
- this.setState(CdcrParams.ProcessState.get(zkClient.getData(this.getZnodePath(), watcher, null, true)));
- } catch (KeeperException | InterruptedException e) {
- log.warn("Failed synchronising new state", e);
- }
- }
-
- private void createStateNode() {
- SolrZkClient zkClient = core.getCoreContainer().getZkController().getZkClient();
- try {
- if (!zkClient.exists(this.getZnodePath(), true)) {
- if (!zkClient.exists(this.getZnodeBase(), true)) { // Should be a no-op if the node exists
- zkClient.makePath(this.getZnodeBase(), null, CreateMode.PERSISTENT, null, false, true);
- }
- zkClient.create(this.getZnodePath(), DEFAULT_STATE.getBytes(), CreateMode.PERSISTENT, true);
- if (log.isInfoEnabled()) {
- log.info("Created znode {}", this.getZnodePath());
- }
- }
- } catch (KeeperException.NodeExistsException ne) {
- // Someone got in first and created the node.
- } catch (KeeperException | InterruptedException e) {
- log.warn("Failed to create CDCR process state node", e);
- }
- }
-
- void shutdown() {
- if (wrappedWatcher != null) {
- wrappedWatcher.cancel(); // cancel the watcher to avoid spurious warn messages during shutdown
- }
- }
-
- private class ProcessStateWatcher implements Watcher {
-
- private boolean isCancelled = false;
-
- /**
- * Cancel the watcher to avoid spurious warn messages during shutdown.
- */
- void cancel() {
- isCancelled = true;
- }
-
- @Override
- public void process(WatchedEvent event) {
- if (isCancelled) return; // if the watcher is cancelled, do nothing.
- String collectionName = core.getCoreDescriptor().getCloudDescriptor().getCollectionName();
- String shard = core.getCoreDescriptor().getCloudDescriptor().getShardId();
-
- log.info("The CDCR process state has changed: {} @ {}:{}", event, collectionName, shard);
- // session events are not change events, and do not remove the watcher
- if (Event.EventType.None.equals(event.getType())) {
- return;
- }
- SolrZkClient zkClient = core.getCoreContainer().getZkController().getZkClient();
- try {
- CdcrParams.ProcessState state = CdcrParams.ProcessState.get(zkClient.getData(CdcrProcessStateManager.this.getZnodePath(), watcher, null, true));
- log.info("Received new CDCR process state from watcher: {} @ {}:{}", state, collectionName, shard);
- CdcrProcessStateManager.this.setState(state);
- } catch (KeeperException | InterruptedException e) {
- log.warn("Failed synchronising new state @ {}: {}", collectionName, shard, e);
- }
- }
-
- }
-
-}
-
diff --git a/solr/core/src/java/org/apache/solr/handler/CdcrReplicator.java b/solr/core/src/java/org/apache/solr/handler/CdcrReplicator.java
deleted file mode 100644
index 936750e430a..00000000000
--- a/solr/core/src/java/org/apache/solr/handler/CdcrReplicator.java
+++ /dev/null
@@ -1,258 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.solr.handler;
-
-import java.io.IOException;
-import java.lang.invoke.MethodHandles;
-import java.nio.charset.Charset;
-import java.util.List;
-
-import org.apache.solr.client.solrj.SolrServerException;
-import org.apache.solr.client.solrj.impl.CloudSolrClient;
-import org.apache.solr.client.solrj.request.UpdateRequest;
-import org.apache.solr.client.solrj.response.UpdateResponse;
-import org.apache.solr.common.SolrException;
-import org.apache.solr.common.SolrInputDocument;
-import org.apache.solr.update.CdcrUpdateLog;
-import org.apache.solr.update.UpdateLog;
-import org.apache.solr.update.processor.CdcrUpdateProcessor;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import static org.apache.solr.common.params.CommonParams.VERSION_FIELD;
-
-/**
- * The replication logic. Given a {@link org.apache.solr.handler.CdcrReplicatorState}, it reads all the new entries
- * in the update log and forward them to the target cluster. If an error occurs, the replication is stopped and
- * will be tried again later.
- * @deprecated since 8.6
- */
-@Deprecated(since = "8.6")
-public class CdcrReplicator implements Runnable {
-
- private final CdcrReplicatorState state;
- private final int batchSize;
-
- private static final Logger log = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
-
- public CdcrReplicator(CdcrReplicatorState state, int batchSize) {
- this.state = state;
- this.batchSize = batchSize;
- }
-
- @Override
- public void run() {
- CdcrUpdateLog.CdcrLogReader logReader = state.getLogReader();
- CdcrUpdateLog.CdcrLogReader subReader = null;
- if (logReader == null) {
- log.warn("Log reader for target {} is not initialised, it will be ignored.", state.getTargetCollection());
- return;
- }
-
- try {
- // create update request
- UpdateRequest req = new UpdateRequest();
- // Add the param to indicate the {@link CdcrUpdateProcessor} to keep the provided version number
- req.setParam(CdcrUpdateProcessor.CDCR_UPDATE, "");
-
- // Start the benchmark timer
- state.getBenchmarkTimer().start();
-
- long counter = 0;
- subReader = logReader.getSubReader();
-
- for (int i = 0; i < batchSize; i++) {
- Object o = subReader.next();
- if (o == null) break; // we have reached the end of the update logs, we should close the batch
-
- if (isTargetCluster(o)) {
- continue;
- }
-
- if (isDelete(o)) {
-
- /*
- * Deletes are sent one at a time.
- */
-
- // First send out current batch of SolrInputDocument, the non-deletes.
- List docs = req.getDocuments();
-
- if (docs != null && docs.size() > 0) {
- subReader.resetToLastPosition(); // Push back the delete for now.
- this.sendRequest(req); // Send the batch update request
- logReader.forwardSeek(subReader); // Advance the main reader to just before the delete.
- o = subReader.next(); // Read the delete again
- counter += docs.size();
- req.clear();
- }
-
- // Process Delete
- this.processUpdate(o, req);
- this.sendRequest(req);
- logReader.forwardSeek(subReader);
- counter++;
- req.clear();
-
- } else {
-
- this.processUpdate(o, req);
-
- }
- }
-
- //Send the final batch out.
- List docs = req.getDocuments();
-
- if ((docs != null && docs.size() > 0)) {
- this.sendRequest(req);
- counter += docs.size();
- }
-
- // we might have read a single commit operation and reached the end of the update logs
- logReader.forwardSeek(subReader);
-
- if (log.isInfoEnabled()) {
- log.info("Forwarded {} updates to target {}", counter, state.getTargetCollection());
- }
- } catch (Exception e) {
- // report error and update error stats
- this.handleException(e);
- } finally {
- // stop the benchmark timer
- state.getBenchmarkTimer().stop();
- // ensure that the subreader is closed and the associated pointer is removed
- if (subReader != null) subReader.close();
- }
- }
-
- private void sendRequest(UpdateRequest req) throws IOException, SolrServerException, CdcrReplicatorException {
- UpdateResponse rsp = req.process(state.getClient());
- if (rsp.getStatus() != 0) {
- throw new CdcrReplicatorException(req, rsp);
- }
- state.resetConsecutiveErrors();
- }
-
- /** check whether the update read from TLog is received from source
- * or received via solr client
- */
- private boolean isTargetCluster(Object o) {
- @SuppressWarnings({"rawtypes"})
- List entry = (List) o;
- int operationAndFlags = (Integer) entry.get(0);
- int oper = operationAndFlags & UpdateLog.OPERATION_MASK;
- Boolean isTarget = false;
- if (oper == UpdateLog.DELETE_BY_QUERY || oper == UpdateLog.DELETE) {
- if (entry.size() == 4) { //back-combat - skip for previous versions
- isTarget = (Boolean) entry.get(entry.size() - 1);
- }
- } else if (oper == UpdateLog.UPDATE_INPLACE) {
- if (entry.size() == 6) { //back-combat - skip for previous versions
- isTarget = (Boolean) entry.get(entry.size() - 2);
- }
- } else if (oper == UpdateLog.ADD) {
- if (entry.size() == 4) { //back-combat - skip for previous versions
- isTarget = (Boolean) entry.get(entry.size() - 2);
- }
- }
- return isTarget;
- }
-
- private boolean isDelete(Object o) {
- @SuppressWarnings({"rawtypes"})
- List entry = (List) o;
- int operationAndFlags = (Integer) entry.get(0);
- int oper = operationAndFlags & UpdateLog.OPERATION_MASK;
- return oper == UpdateLog.DELETE_BY_QUERY || oper == UpdateLog.DELETE;
- }
-
- private void handleException(Exception e) {
- if (e instanceof CdcrReplicatorException) {
- UpdateRequest req = ((CdcrReplicatorException) e).req;
- UpdateResponse rsp = ((CdcrReplicatorException) e).rsp;
- log.warn("Failed to forward update request {} to target: {}. Got response {}", req, state.getTargetCollection(), rsp);
- state.reportError(CdcrReplicatorState.ErrorType.BAD_REQUEST);
- } else if (e instanceof CloudSolrClient.RouteException) {
- log.warn("Failed to forward update request to target: {}", state.getTargetCollection(), e);
- state.reportError(CdcrReplicatorState.ErrorType.BAD_REQUEST);
- } else {
- log.warn("Failed to forward update request to target: {}", state.getTargetCollection(), e);
- state.reportError(CdcrReplicatorState.ErrorType.INTERNAL);
- }
- }
-
- private UpdateRequest processUpdate(Object o, UpdateRequest req) {
-
- // should currently be a List
- @SuppressWarnings({"rawtypes"})
- List entry = (List) o;
-
- int operationAndFlags = (Integer) entry.get(0);
- int oper = operationAndFlags & UpdateLog.OPERATION_MASK;
- long version = (Long) entry.get(1);
-
- // record the operation in the benchmark timer
- state.getBenchmarkTimer().incrementCounter(oper);
-
- switch (oper) {
- case UpdateLog.ADD: {
- // the version is already attached to the document
- SolrInputDocument sdoc = (SolrInputDocument) entry.get(entry.size() - 1);
- req.add(sdoc);
- return req;
- }
- case UpdateLog.DELETE: {
- byte[] idBytes = (byte[]) entry.get(2);
- req.deleteById(new String(idBytes, Charset.forName("UTF-8")));
- req.setParam(VERSION_FIELD, Long.toString(version));
- return req;
- }
-
- case UpdateLog.DELETE_BY_QUERY: {
- String query = (String) entry.get(2);
- req.deleteByQuery(query);
- req.setParam(VERSION_FIELD, Long.toString(version));
- return req;
- }
-
- case UpdateLog.COMMIT: {
- return null;
- }
-
- default:
- throw new SolrException(SolrException.ErrorCode.SERVER_ERROR, "Unknown Operation! " + oper);
- }
- }
-
- /**
- * Exception to catch update request issues with the target cluster.
- */
- public static class CdcrReplicatorException extends Exception {
-
- private final UpdateRequest req;
- private final UpdateResponse rsp;
-
- public CdcrReplicatorException(UpdateRequest req, UpdateResponse rsp) {
- this.req = req;
- this.rsp = rsp;
- }
-
- }
-
-}
-
diff --git a/solr/core/src/java/org/apache/solr/handler/CdcrReplicatorManager.java b/solr/core/src/java/org/apache/solr/handler/CdcrReplicatorManager.java
deleted file mode 100644
index 760e52716cb..00000000000
--- a/solr/core/src/java/org/apache/solr/handler/CdcrReplicatorManager.java
+++ /dev/null
@@ -1,441 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.solr.handler;
-
-import java.io.Closeable;
-import java.io.IOException;
-import java.lang.invoke.MethodHandles;
-import java.util.ArrayList;
-import java.util.Collections;
-import java.util.List;
-import java.util.Locale;
-import java.util.Map;
-import java.util.Optional;
-import java.util.concurrent.Callable;
-import java.util.concurrent.ExecutorService;
-import java.util.concurrent.TimeUnit;
-
-import org.apache.http.client.HttpClient;
-import org.apache.solr.client.solrj.SolrClient;
-import org.apache.solr.client.solrj.SolrRequest;
-import org.apache.solr.client.solrj.SolrServerException;
-import org.apache.solr.client.solrj.impl.CloudSolrClient;
-import org.apache.solr.client.solrj.impl.CloudSolrClient.Builder;
-import org.apache.solr.client.solrj.impl.HttpSolrClient;
-import org.apache.solr.client.solrj.request.QueryRequest;
-import org.apache.solr.common.SolrException;
-import org.apache.solr.common.cloud.Replica;
-import org.apache.solr.common.cloud.ZkCoreNodeProps;
-import org.apache.solr.common.params.CommonParams;
-import org.apache.solr.common.params.ModifiableSolrParams;
-import org.apache.solr.common.params.SolrParams;
-import org.apache.solr.common.util.ExecutorUtil;
-import org.apache.solr.common.util.IOUtils;
-import org.apache.solr.common.util.NamedList;
-import org.apache.solr.common.util.SolrNamedThreadFactory;
-import org.apache.solr.common.util.TimeSource;
-import org.apache.solr.core.SolrCore;
-import org.apache.solr.update.CdcrUpdateLog;
-import org.apache.solr.util.TimeOut;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import static org.apache.solr.handler.admin.CoreAdminHandler.RESPONSE_STATUS;
-
-@Deprecated(since = "8.6")
-class CdcrReplicatorManager implements CdcrStateManager.CdcrStateObserver {
-
- private static final int MAX_BOOTSTRAP_ATTEMPTS = 5;
- private static final int BOOTSTRAP_RETRY_DELAY_MS = 2000;
- // 6 hours is hopefully long enough for most indexes
- private static final long BOOTSTRAP_TIMEOUT_SECONDS = 6L * 3600L * 3600L;
-
- private List replicatorStates;
-
- private final CdcrReplicatorScheduler scheduler;
- private CdcrProcessStateManager processStateManager;
- private CdcrLeaderStateManager leaderStateManager;
-
- private SolrCore core;
- private String path;
-
- private ExecutorService bootstrapExecutor;
- private volatile BootstrapStatusRunnable bootstrapStatusRunnable;
-
- private static final Logger log = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
-
- CdcrReplicatorManager(final SolrCore core, String path,
- SolrParams replicatorConfiguration,
- Map> replicasConfiguration) {
- this.core = core;
- this.path = path;
-
- // create states
- replicatorStates = new ArrayList<>();
- String myCollection = core.getCoreDescriptor().getCloudDescriptor().getCollectionName();
- List targets = replicasConfiguration.get(myCollection);
- if (targets != null) {
- for (SolrParams params : targets) {
- String zkHost = params.get(CdcrParams.ZK_HOST_PARAM);
- String targetCollection = params.get(CdcrParams.TARGET_COLLECTION_PARAM);
-
- CloudSolrClient client = new Builder(Collections.singletonList(zkHost), Optional.empty())
- .withSocketTimeout(30000).withConnectionTimeout(15000)
- .sendUpdatesOnlyToShardLeaders()
- .build();
- client.setDefaultCollection(targetCollection);
- replicatorStates.add(new CdcrReplicatorState(targetCollection, zkHost, client));
- }
- }
-
- this.scheduler = new CdcrReplicatorScheduler(this, replicatorConfiguration);
- }
-
- void setProcessStateManager(final CdcrProcessStateManager processStateManager) {
- this.processStateManager = processStateManager;
- this.processStateManager.register(this);
- }
-
- void setLeaderStateManager(final CdcrLeaderStateManager leaderStateManager) {
- this.leaderStateManager = leaderStateManager;
- this.leaderStateManager.register(this);
- }
-
- /**
- *
- * Inform the replicator manager of a change of state, and tell him to update its own state.
- *
- *
- * If we are the leader and the process state is STARTED, we need to initialise the log readers and start the
- * scheduled thread poll.
- * Otherwise, if the process state is STOPPED or if we are not the leader, we need to close the log readers and stop
- * the thread pool.
- *
- *
- * This method is synchronised as it can both be called by the leaderStateManager and the processStateManager.
- *
- */
- @Override
- public synchronized void stateUpdate() {
- if (leaderStateManager.amILeader() && processStateManager.getState().equals(CdcrParams.ProcessState.STARTED)) {
- if (replicatorStates.size() > 0) {
- this.bootstrapExecutor = ExecutorUtil.newMDCAwareFixedThreadPool(replicatorStates.size(),
- new SolrNamedThreadFactory("cdcr-bootstrap-status"));
- }
- this.initLogReaders();
- this.scheduler.start();
- return;
- }
-
- this.scheduler.shutdown();
- if (bootstrapExecutor != null) {
- IOUtils.closeQuietly(bootstrapStatusRunnable);
- ExecutorUtil.shutdownAndAwaitTermination(bootstrapExecutor);
- }
- this.closeLogReaders();
- @SuppressWarnings({"rawtypes"})
- Callable callable = core.getSolrCoreState().getCdcrBootstrapCallable();
- if (callable != null) {
- CdcrRequestHandler.BootstrapCallable bootstrapCallable = (CdcrRequestHandler.BootstrapCallable) callable;
- IOUtils.closeQuietly(bootstrapCallable);
- }
- }
-
- List getReplicatorStates() {
- return replicatorStates;
- }
-
- private void initLogReaders() {
- String collectionName = core.getCoreDescriptor().getCloudDescriptor().getCollectionName();
- String shard = core.getCoreDescriptor().getCloudDescriptor().getShardId();
- CdcrUpdateLog ulog = (CdcrUpdateLog) core.getUpdateHandler().getUpdateLog();
-
- for (CdcrReplicatorState state : replicatorStates) {
- state.closeLogReader();
- try {
- long checkpoint = this.getCheckpoint(state);
- if (log.isInfoEnabled()) {
- log.info("Create new update log reader for target {} with checkpoint {} @ {}:{}", state.getTargetCollection(),
- checkpoint, collectionName, shard);
- }
- CdcrUpdateLog.CdcrLogReader reader = ulog.newLogReader();
- boolean seek = reader.seek(checkpoint);
- state.init(reader);
- if (!seek) {
- // targetVersion is lower than the oldest known entry.
- // In this scenario, it probably means that there is a gap in the updates log.
- // the best we can do here is to bootstrap the target leader by replicating the full index
- final String targetCollection = state.getTargetCollection();
- state.setBootstrapInProgress(true);
- log.info("Attempting to bootstrap target collection: {}, shard: {}", targetCollection, shard);
- bootstrapStatusRunnable = new BootstrapStatusRunnable(core, state);
- log.info("Submitting bootstrap task to executor");
- try {
- bootstrapExecutor.submit(bootstrapStatusRunnable);
- } catch (Exception e) {
- log.error("Unable to submit bootstrap call to executor", e);
- }
- }
- } catch (IOException | SolrServerException | SolrException e) {
- log.warn("Unable to instantiate the log reader for target collection {}", state.getTargetCollection(), e);
- } catch (InterruptedException e) {
- log.warn("Thread interrupted while instantiate the log reader for target collection {}", state.getTargetCollection(), e);
- Thread.currentThread().interrupt();
- }
- }
- }
-
- private long getCheckpoint(CdcrReplicatorState state) throws IOException, SolrServerException {
- ModifiableSolrParams params = new ModifiableSolrParams();
- params.set(CommonParams.ACTION, CdcrParams.CdcrAction.COLLECTIONCHECKPOINT.toString());
-
- @SuppressWarnings({"rawtypes"})
- SolrRequest request = new QueryRequest(params);
- request.setPath(path);
-
- @SuppressWarnings({"rawtypes"})
- NamedList response = state.getClient().request(request);
- return (Long) response.get(CdcrParams.CHECKPOINT);
- }
-
- void closeLogReaders() {
- for (CdcrReplicatorState state : replicatorStates) {
- state.closeLogReader();
- }
- }
-
- /**
- * Shutdown all the {@link org.apache.solr.handler.CdcrReplicatorState} by closing their
- * {@link org.apache.solr.client.solrj.impl.CloudSolrClient} and
- * {@link org.apache.solr.update.CdcrUpdateLog.CdcrLogReader}.
- */
- void shutdown() {
- this.scheduler.shutdown();
- if (bootstrapExecutor != null) {
- IOUtils.closeQuietly(bootstrapStatusRunnable);
- ExecutorUtil.shutdownAndAwaitTermination(bootstrapExecutor);
- }
- for (CdcrReplicatorState state : replicatorStates) {
- state.shutdown();
- }
- replicatorStates.clear();
- }
-
- private class BootstrapStatusRunnable implements Runnable, Closeable {
- private final CdcrReplicatorState state;
- private final String targetCollection;
- private final String shard;
- private final String collectionName;
- private final CdcrUpdateLog ulog;
- private final String myCoreUrl;
-
- private volatile boolean closed = false;
-
- BootstrapStatusRunnable(SolrCore core, CdcrReplicatorState state) {
- this.collectionName = core.getCoreDescriptor().getCloudDescriptor().getCollectionName();
- this.shard = core.getCoreDescriptor().getCloudDescriptor().getShardId();
- this.ulog = (CdcrUpdateLog) core.getUpdateHandler().getUpdateLog();
- this.state = state;
- this.targetCollection = state.getTargetCollection();
- String baseUrl = core.getCoreContainer().getZkController().getBaseUrl();
- this.myCoreUrl = ZkCoreNodeProps.getCoreUrl(baseUrl, core.getName());
- }
-
- @Override
- public void close() throws IOException {
- closed = true;
- try {
- Replica leader = state.getClient().getZkStateReader().getLeaderRetry(targetCollection, shard, 30000); // assume same shard exists on target
- String leaderCoreUrl = leader.getCoreUrl();
- HttpClient httpClient = state.getClient().getLbClient().getHttpClient();
- try (HttpSolrClient client = new HttpSolrClient.Builder(leaderCoreUrl).withHttpClient(httpClient).build()) {
- sendCdcrCommand(client, CdcrParams.CdcrAction.CANCEL_BOOTSTRAP);
- } catch (SolrServerException e) {
- log.error("Error sending cancel bootstrap message to target collection: {} shard: {} leader: {}",
- targetCollection, shard, leaderCoreUrl);
- }
- } catch (InterruptedException e) {
- log.error("Interrupted while closing BootstrapStatusRunnable", e);
- Thread.currentThread().interrupt();
- }
- }
-
- @Override
- public void run() {
- int retries = 1;
- boolean success = false;
- try {
- while (!closed && sendBootstrapCommand() != BootstrapStatus.SUBMITTED) {
- Thread.sleep(BOOTSTRAP_RETRY_DELAY_MS);
- }
- TimeOut timeOut = new TimeOut(BOOTSTRAP_TIMEOUT_SECONDS, TimeUnit.SECONDS, TimeSource.NANO_TIME);
- while (!timeOut.hasTimedOut()) {
- if (closed) {
- log.warn("Cancelling waiting for bootstrap on target: {} shard: {} to complete", targetCollection, shard);
- state.setBootstrapInProgress(false);
- break;
- }
- BootstrapStatus status = getBoostrapStatus();
- if (status == BootstrapStatus.RUNNING) {
- try {
- log.info("CDCR bootstrap running for {} seconds, sleeping for {} ms",
- BOOTSTRAP_TIMEOUT_SECONDS - timeOut.timeLeft(TimeUnit.SECONDS), BOOTSTRAP_RETRY_DELAY_MS);
- timeOut.sleep(BOOTSTRAP_RETRY_DELAY_MS);
- } catch (InterruptedException e) {
- Thread.currentThread().interrupt();
- }
- } else if (status == BootstrapStatus.COMPLETED) {
- log.info("CDCR bootstrap successful in {} seconds", BOOTSTRAP_TIMEOUT_SECONDS - timeOut.timeLeft(TimeUnit.SECONDS));
- long checkpoint = CdcrReplicatorManager.this.getCheckpoint(state);
- if (log.isInfoEnabled()) {
- log.info("Create new update log reader for target {} with checkpoint {} @ {}:{}", state.getTargetCollection(),
- checkpoint, collectionName, shard);
- }
- CdcrUpdateLog.CdcrLogReader reader1 = ulog.newLogReader();
- reader1.seek(checkpoint);
- success = true;
- break;
- } else if (status == BootstrapStatus.FAILED) {
- log.warn("CDCR bootstrap failed in {} seconds", BOOTSTRAP_TIMEOUT_SECONDS - timeOut.timeLeft(TimeUnit.SECONDS));
- // let's retry a fixed number of times before giving up
- if (retries >= MAX_BOOTSTRAP_ATTEMPTS) {
- log.error("Unable to bootstrap the target collection: {}, shard: {} even after {} retries", targetCollection, shard, retries);
- break;
- } else {
- log.info("Retry: {} - Attempting to bootstrap target collection: {} shard: {}", retries, targetCollection, shard);
- while (!closed && sendBootstrapCommand() != BootstrapStatus.SUBMITTED) {
- Thread.sleep(BOOTSTRAP_RETRY_DELAY_MS);
- }
- timeOut = new TimeOut(BOOTSTRAP_TIMEOUT_SECONDS, TimeUnit.SECONDS, TimeSource.NANO_TIME); // reset the timer
- retries++;
- }
- } else if (status == BootstrapStatus.NOTFOUND || status == BootstrapStatus.CANCELLED) {
- if (log.isInfoEnabled()) {
- log.info("CDCR bootstrap {} in {} seconds"
- , (status == BootstrapStatus.NOTFOUND ? "not found" : "cancelled")
- , BOOTSTRAP_TIMEOUT_SECONDS - timeOut.timeLeft(TimeUnit.SECONDS));
- }
- // the leader of the target shard may have changed and therefore there is no record of the
- // bootstrap process so we must retry the operation
- while (!closed && sendBootstrapCommand() != BootstrapStatus.SUBMITTED) {
- Thread.sleep(BOOTSTRAP_RETRY_DELAY_MS);
- }
- retries = 1;
- timeOut = new TimeOut(6L * 3600L * 3600L, TimeUnit.SECONDS, TimeSource.NANO_TIME); // reset the timer
- } else if (status == BootstrapStatus.UNKNOWN || status == BootstrapStatus.SUBMITTED) {
- if (log.isInfoEnabled()) {
- log.info("CDCR bootstrap is {} {}", (status == BootstrapStatus.UNKNOWN ? "unknown" : "submitted"),
- BOOTSTRAP_TIMEOUT_SECONDS - timeOut.timeLeft(TimeUnit.SECONDS));
- }
- // we were not able to query the status on the remote end
- // so just sleep for a bit and try again
- timeOut.sleep(BOOTSTRAP_RETRY_DELAY_MS);
- }
- }
- } catch (InterruptedException e) {
- log.info("Bootstrap thread interrupted");
- state.reportError(CdcrReplicatorState.ErrorType.INTERNAL);
- Thread.currentThread().interrupt();
- } catch (IOException | SolrServerException | SolrException e) {
- log.error("Unable to bootstrap the target collection {} shard: {}", targetCollection, shard, e);
- state.reportError(CdcrReplicatorState.ErrorType.BAD_REQUEST);
- } finally {
- if (success) {
- log.info("Bootstrap successful, giving the go-ahead to replicator");
- state.setBootstrapInProgress(false);
- }
- }
- }
-
- private BootstrapStatus sendBootstrapCommand() throws InterruptedException {
- Replica leader = state.getClient().getZkStateReader().getLeaderRetry(targetCollection, shard, 30000); // assume same shard exists on target
- String leaderCoreUrl = leader.getCoreUrl();
- HttpClient httpClient = state.getClient().getLbClient().getHttpClient();
- try (HttpSolrClient client = new HttpSolrClient.Builder(leaderCoreUrl).withHttpClient(httpClient).build()) {
- log.info("Attempting to bootstrap target collection: {} shard: {} leader: {}", targetCollection, shard, leaderCoreUrl);
- try {
- @SuppressWarnings({"rawtypes"})
- NamedList response = sendCdcrCommand(client, CdcrParams.CdcrAction.BOOTSTRAP, ReplicationHandler.LEADER_URL, myCoreUrl);
- log.debug("CDCR Bootstrap response: {}", response);
- String status = response.get(RESPONSE_STATUS).toString();
- return BootstrapStatus.valueOf(status.toUpperCase(Locale.ROOT));
- } catch (Exception e) {
- log.error("Exception submitting bootstrap request", e);
- return BootstrapStatus.UNKNOWN;
- }
- } catch (IOException e) {
- log.error("There shouldn't be an IOException while closing but there was!", e);
- }
- return BootstrapStatus.UNKNOWN;
- }
-
- private BootstrapStatus getBoostrapStatus() throws InterruptedException {
- try {
- Replica leader = state.getClient().getZkStateReader().getLeaderRetry(targetCollection, shard, 30000); // assume same shard exists on target
- String leaderCoreUrl = leader.getCoreUrl();
- HttpClient httpClient = state.getClient().getLbClient().getHttpClient();
- try (HttpSolrClient client = new HttpSolrClient.Builder(leaderCoreUrl).withHttpClient(httpClient).build()) {
- @SuppressWarnings({"rawtypes"})
- NamedList response = sendCdcrCommand(client, CdcrParams.CdcrAction.BOOTSTRAP_STATUS);
- String status = (String) response.get(RESPONSE_STATUS);
- BootstrapStatus bootstrapStatus = BootstrapStatus.valueOf(status.toUpperCase(Locale.ROOT));
- if (bootstrapStatus == BootstrapStatus.RUNNING) {
- return BootstrapStatus.RUNNING;
- } else if (bootstrapStatus == BootstrapStatus.COMPLETED) {
- return BootstrapStatus.COMPLETED;
- } else if (bootstrapStatus == BootstrapStatus.FAILED) {
- return BootstrapStatus.FAILED;
- } else if (bootstrapStatus == BootstrapStatus.NOTFOUND) {
- log.warn("Bootstrap process was not found on target collection: {} shard: {}, leader: {}", targetCollection, shard, leaderCoreUrl);
- return BootstrapStatus.NOTFOUND;
- } else if (bootstrapStatus == BootstrapStatus.CANCELLED) {
- return BootstrapStatus.CANCELLED;
- } else {
- throw new SolrException(SolrException.ErrorCode.SERVER_ERROR,
- "Unknown status: " + status + " returned by BOOTSTRAP_STATUS command");
- }
- }
- } catch (Exception e) {
- log.error("Exception during bootstrap status request", e);
- return BootstrapStatus.UNKNOWN;
- }
- }
- }
-
- @SuppressWarnings({"rawtypes"})
- private NamedList sendCdcrCommand(SolrClient client, CdcrParams.CdcrAction action, String... params) throws SolrServerException, IOException {
- ModifiableSolrParams solrParams = new ModifiableSolrParams();
- solrParams.set(CommonParams.QT, "/cdcr");
- solrParams.set(CommonParams.ACTION, action.toString());
- for (int i = 0; i < params.length - 1; i+=2) {
- solrParams.set(params[i], params[i + 1]);
- }
- SolrRequest request = new QueryRequest(solrParams);
- return client.request(request);
- }
-
- private enum BootstrapStatus {
- SUBMITTED,
- RUNNING,
- COMPLETED,
- FAILED,
- NOTFOUND,
- CANCELLED,
- UNKNOWN
- }
-}
-
diff --git a/solr/core/src/java/org/apache/solr/handler/CdcrReplicatorScheduler.java b/solr/core/src/java/org/apache/solr/handler/CdcrReplicatorScheduler.java
deleted file mode 100644
index 14184651b07..00000000000
--- a/solr/core/src/java/org/apache/solr/handler/CdcrReplicatorScheduler.java
+++ /dev/null
@@ -1,116 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.solr.handler;
-
-import org.apache.solr.common.params.SolrParams;
-import org.apache.solr.common.util.ExecutorUtil;
-import org.apache.solr.common.util.SolrNamedThreadFactory;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import java.lang.invoke.MethodHandles;
-import java.util.concurrent.*;
-
-/**
- * Schedule the execution of the {@link org.apache.solr.handler.CdcrReplicator} threads at
- * regular time interval. It relies on a queue of {@link org.apache.solr.handler.CdcrReplicatorState} in
- * order to avoid that one {@link org.apache.solr.handler.CdcrReplicatorState} is used by two threads at the same
- * time.
- */
-class CdcrReplicatorScheduler {
-
- private boolean isStarted = false;
-
- private ScheduledExecutorService scheduler;
- private ExecutorService replicatorsPool;
-
- private final CdcrReplicatorManager replicatorManager;
- private final ConcurrentLinkedQueue statesQueue;
-
- private int poolSize = DEFAULT_POOL_SIZE;
- private int timeSchedule = DEFAULT_TIME_SCHEDULE;
- private int batchSize = DEFAULT_BATCH_SIZE;
-
- private static final int DEFAULT_POOL_SIZE = 2;
- private static final int DEFAULT_TIME_SCHEDULE = 10;
- private static final int DEFAULT_BATCH_SIZE = 128;
-
- private static final Logger log = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
-
- CdcrReplicatorScheduler(final CdcrReplicatorManager replicatorStatesManager, final SolrParams replicatorConfiguration) {
- this.replicatorManager = replicatorStatesManager;
- this.statesQueue = new ConcurrentLinkedQueue<>(replicatorManager.getReplicatorStates());
- if (replicatorConfiguration != null) {
- poolSize = replicatorConfiguration.getInt(CdcrParams.THREAD_POOL_SIZE_PARAM, DEFAULT_POOL_SIZE);
- timeSchedule = replicatorConfiguration.getInt(CdcrParams.SCHEDULE_PARAM, DEFAULT_TIME_SCHEDULE);
- batchSize = replicatorConfiguration.getInt(CdcrParams.BATCH_SIZE_PARAM, DEFAULT_BATCH_SIZE);
- }
- }
-
- void start() {
- if (!isStarted) {
- scheduler = Executors.newSingleThreadScheduledExecutor(new SolrNamedThreadFactory("cdcr-scheduler"));
- replicatorsPool = ExecutorUtil.newMDCAwareFixedThreadPool(poolSize, new SolrNamedThreadFactory("cdcr-replicator"));
-
- // the scheduler thread is executed every second and submits one replication task
- // per available state in the queue
- scheduler.scheduleWithFixedDelay(() -> {
- int nCandidates = statesQueue.size();
- for (int i = 0; i < nCandidates; i++) {
- // a thread that poll one state from the queue, execute the replication task, and push back
- // the state in the queue when the task is completed
- replicatorsPool.execute(() -> {
- CdcrReplicatorState state = statesQueue.poll();
- assert state != null; // Should never happen
- try {
- if (!state.isBootstrapInProgress()) {
- new CdcrReplicator(state, batchSize).run();
- } else {
- if (log.isDebugEnabled()) {
- log.debug("Replicator state is bootstrapping, skipping replication for target collection {}", state.getTargetCollection());
- }
- }
- } finally {
- statesQueue.offer(state);
- }
- });
-
- }
- }, 0, timeSchedule, TimeUnit.MILLISECONDS);
- isStarted = true;
- }
- }
-
- void shutdown() {
- if (isStarted) {
- // interrupts are often dangerous in Lucene / Solr code, but the
- // test for this will leak threads without
- replicatorsPool.shutdown();
- try {
- replicatorsPool.awaitTermination(60, TimeUnit.SECONDS);
- } catch (InterruptedException e) {
- log.warn("Thread interrupted while waiting for CDCR replicator threadpool close.");
- Thread.currentThread().interrupt();
- } finally {
- scheduler.shutdownNow();
- isStarted = false;
- }
- }
- }
-
-}
-
diff --git a/solr/core/src/java/org/apache/solr/handler/CdcrReplicatorState.java b/solr/core/src/java/org/apache/solr/handler/CdcrReplicatorState.java
deleted file mode 100644
index af9020ae5e4..00000000000
--- a/solr/core/src/java/org/apache/solr/handler/CdcrReplicatorState.java
+++ /dev/null
@@ -1,299 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.solr.handler;
-
-import java.io.IOException;
-import java.lang.invoke.MethodHandles;
-import java.time.Instant;
-import java.util.ArrayList;
-import java.util.Date;
-import java.util.HashMap;
-import java.util.Iterator;
-import java.util.LinkedList;
-import java.util.List;
-import java.util.Locale;
-import java.util.Map;
-import java.util.concurrent.atomic.AtomicBoolean;
-import java.util.concurrent.atomic.AtomicInteger;
-
-import org.apache.solr.client.solrj.impl.CloudSolrClient;
-import org.apache.solr.update.CdcrUpdateLog;
-import org.apache.solr.update.UpdateLog;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-/**
- * The state of the replication with a target cluster.
- */
-class CdcrReplicatorState {
-
- private final String targetCollection;
- private final String zkHost;
- private final CloudSolrClient targetClient;
-
- private CdcrUpdateLog.CdcrLogReader logReader;
-
- private long consecutiveErrors = 0;
- private final Map errorCounters = new HashMap<>();
- private final FixedQueue errorsQueue = new FixedQueue<>(100); // keep the last 100 errors
-
- private BenchmarkTimer benchmarkTimer;
-
- private static final Logger log = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
-
- private final AtomicBoolean bootstrapInProgress = new AtomicBoolean(false);
- private final AtomicInteger numBootstraps = new AtomicInteger();
-
- CdcrReplicatorState(final String targetCollection, final String zkHost, final CloudSolrClient targetClient) {
- this.targetCollection = targetCollection;
- this.targetClient = targetClient;
- this.zkHost = zkHost;
- this.benchmarkTimer = new BenchmarkTimer();
- }
-
- /**
- * Initialise the replicator state with a {@link org.apache.solr.update.CdcrUpdateLog.CdcrLogReader}
- * that is positioned at the last target cluster checkpoint.
- */
- void init(final CdcrUpdateLog.CdcrLogReader logReader) {
- this.logReader = logReader;
- }
-
- void closeLogReader() {
- if (logReader != null) {
- logReader.close();
- logReader = null;
- }
- }
-
- CdcrUpdateLog.CdcrLogReader getLogReader() {
- return logReader;
- }
-
- String getTargetCollection() {
- return targetCollection;
- }
-
- String getZkHost() {
- return zkHost;
- }
-
- CloudSolrClient getClient() {
- return targetClient;
- }
-
- void shutdown() {
- try {
- targetClient.close();
- } catch (IOException ioe) {
- log.warn("Caught exception trying to close server: ", ioe);
- }
- this.closeLogReader();
- }
-
- void reportError(ErrorType error) {
- if (!errorCounters.containsKey(error)) {
- errorCounters.put(error, 0l);
- }
- errorCounters.put(error, errorCounters.get(error) + 1);
- errorsQueue.add(new ErrorQueueEntry(error, new Date()));
- consecutiveErrors++;
- }
-
- void resetConsecutiveErrors() {
- consecutiveErrors = 0;
- }
-
- /**
- * Returns the number of consecutive errors encountered while trying to forward updates to the target.
- */
- long getConsecutiveErrors() {
- return consecutiveErrors;
- }
-
- /**
- * Gets the number of errors of a particular type.
- */
- long getErrorCount(ErrorType type) {
- if (errorCounters.containsKey(type)) {
- return errorCounters.get(type);
- } else {
- return 0;
- }
- }
-
- /**
- * Gets the last errors ordered by timestamp (most recent first)
- */
- List getLastErrors() {
- List lastErrors = new ArrayList<>();
- synchronized (errorsQueue) {
- Iterator it = errorsQueue.iterator();
- while (it.hasNext()) {
- ErrorQueueEntry entry = it.next();
- lastErrors.add(new String[]{entry.timestamp.toInstant().toString(), entry.type.toLower()});
- }
- }
- return lastErrors;
- }
-
- /**
- * Return the timestamp of the last processed operations
- */
- String getTimestampOfLastProcessedOperation() {
- if (logReader != null && logReader.getLastVersion() != -1) {
- // Shift back to the right by 20 bits the version number - See VersionInfo#getNewClock
- return Instant.ofEpochMilli(logReader.getLastVersion() >> 20).toString();
- }
- return "";
- }
-
- /**
- * Gets the benchmark timer.
- */
- BenchmarkTimer getBenchmarkTimer() {
- return this.benchmarkTimer;
- }
-
- /**
- * @return true if a bootstrap operation is in progress, false otherwise
- */
- boolean isBootstrapInProgress() {
- return bootstrapInProgress.get();
- }
-
- void setBootstrapInProgress(boolean inProgress) {
- if (bootstrapInProgress.compareAndSet(true, false)) {
- numBootstraps.incrementAndGet();
- }
- bootstrapInProgress.set(inProgress);
- }
-
- public int getNumBootstraps() {
- return numBootstraps.get();
- }
-
- enum ErrorType {
- INTERNAL,
- BAD_REQUEST;
-
- public String toLower() {
- return toString().toLowerCase(Locale.ROOT);
- }
-
- }
-
- static class BenchmarkTimer {
-
- private long startTime;
- private long runTime = 0;
- private Map opCounters = new HashMap<>();
-
- /**
- * Start recording time.
- */
- void start() {
- startTime = System.nanoTime();
- }
-
- /**
- * Stop recording time.
- */
- void stop() {
- runTime += System.nanoTime() - startTime;
- startTime = -1;
- }
-
- void incrementCounter(final int operationType) {
- switch (operationType) {
- case UpdateLog.ADD:
- case UpdateLog.DELETE:
- case UpdateLog.DELETE_BY_QUERY: {
- if (!opCounters.containsKey(operationType)) {
- opCounters.put(operationType, 0l);
- }
- opCounters.put(operationType, opCounters.get(operationType) + 1);
- return;
- }
-
- default:
- }
- }
-
- long getRunTime() {
- long totalRunTime = runTime;
- if (startTime != -1) { // we are currently recording the time
- totalRunTime += System.nanoTime() - startTime;
- }
- return totalRunTime;
- }
-
- double getOperationsPerSecond() {
- long total = 0;
- for (long counter : opCounters.values()) {
- total += counter;
- }
- double elapsedTimeInSeconds = ((double) this.getRunTime() / 1E9);
- return total / elapsedTimeInSeconds;
- }
-
- double getAddsPerSecond() {
- long total = opCounters.get(UpdateLog.ADD) != null ? opCounters.get(UpdateLog.ADD) : 0;
- double elapsedTimeInSeconds = ((double) this.getRunTime() / 1E9);
- return total / elapsedTimeInSeconds;
- }
-
- double getDeletesPerSecond() {
- long total = opCounters.get(UpdateLog.DELETE) != null ? opCounters.get(UpdateLog.DELETE) : 0;
- total += opCounters.get(UpdateLog.DELETE_BY_QUERY) != null ? opCounters.get(UpdateLog.DELETE_BY_QUERY) : 0;
- double elapsedTimeInSeconds = ((double) this.getRunTime() / 1E9);
- return total / elapsedTimeInSeconds;
- }
-
- }
-
- private static class ErrorQueueEntry {
-
- private ErrorType type;
- private Date timestamp;
-
- private ErrorQueueEntry(ErrorType type, Date timestamp) {
- this.type = type;
- this.timestamp = timestamp;
- }
- }
-
- private static class FixedQueue extends LinkedList {
-
- private int maxSize;
-
- public FixedQueue(int maxSize) {
- this.maxSize = maxSize;
- }
-
- @Override
- public synchronized boolean add(E e) {
- super.addFirst(e);
- if (size() > maxSize) {
- removeLast();
- }
- return true;
- }
- }
-
-}
-
diff --git a/solr/core/src/java/org/apache/solr/handler/CdcrRequestHandler.java b/solr/core/src/java/org/apache/solr/handler/CdcrRequestHandler.java
deleted file mode 100644
index a9dfeaed8f3..00000000000
--- a/solr/core/src/java/org/apache/solr/handler/CdcrRequestHandler.java
+++ /dev/null
@@ -1,880 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.solr.handler;
-
-import java.io.Closeable;
-import java.io.IOException;
-import java.lang.invoke.MethodHandles;
-import java.util.ArrayList;
-import java.util.Collection;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
-import java.util.Set;
-import java.util.concurrent.Callable;
-import java.util.concurrent.CancellationException;
-import java.util.concurrent.CountDownLatch;
-import java.util.concurrent.ExecutionException;
-import java.util.concurrent.ExecutorService;
-import java.util.concurrent.Future;
-import java.util.concurrent.RejectedExecutionException;
-import java.util.concurrent.TimeUnit;
-import java.util.concurrent.locks.Lock;
-import java.util.stream.Collectors;
-
-import org.apache.solr.client.solrj.SolrRequest;
-import org.apache.solr.client.solrj.SolrServerException;
-import org.apache.solr.client.solrj.impl.HttpSolrClient;
-import org.apache.solr.client.solrj.request.AbstractUpdateRequest;
-import org.apache.solr.client.solrj.request.QueryRequest;
-import org.apache.solr.client.solrj.request.UpdateRequest;
-import org.apache.solr.cloud.ZkController;
-import org.apache.solr.cloud.ZkShardTerms;
-import org.apache.solr.common.SolrException;
-import org.apache.solr.common.cloud.ClusterState;
-import org.apache.solr.common.cloud.DocCollection;
-import org.apache.solr.common.cloud.Replica;
-import org.apache.solr.common.cloud.Slice;
-import org.apache.solr.common.cloud.ZkCoreNodeProps;
-import org.apache.solr.common.cloud.ZkNodeProps;
-import org.apache.solr.common.params.CommonParams;
-import org.apache.solr.common.params.ModifiableSolrParams;
-import org.apache.solr.common.params.SolrParams;
-import org.apache.solr.common.params.UpdateParams;
-import org.apache.solr.common.util.ExecutorUtil;
-import org.apache.solr.common.util.IOUtils;
-import org.apache.solr.common.util.NamedList;
-import org.apache.solr.common.util.SolrNamedThreadFactory;
-import org.apache.solr.core.CloseHook;
-import org.apache.solr.core.PluginBag;
-import org.apache.solr.core.SolrCore;
-import org.apache.solr.request.SolrQueryRequest;
-import org.apache.solr.request.SolrRequestHandler;
-import org.apache.solr.response.SolrQueryResponse;
-import org.apache.solr.update.CdcrUpdateLog;
-import org.apache.solr.update.SolrCoreState;
-import org.apache.solr.update.UpdateLog;
-import org.apache.solr.update.VersionInfo;
-import org.apache.solr.update.processor.DistributedUpdateProcessor;
-import org.apache.solr.util.plugin.SolrCoreAware;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import static org.apache.solr.handler.admin.CoreAdminHandler.COMPLETED;
-import static org.apache.solr.handler.admin.CoreAdminHandler.FAILED;
-import static org.apache.solr.handler.admin.CoreAdminHandler.RESPONSE;
-import static org.apache.solr.handler.admin.CoreAdminHandler.RESPONSE_MESSAGE;
-import static org.apache.solr.handler.admin.CoreAdminHandler.RESPONSE_STATUS;
-import static org.apache.solr.handler.admin.CoreAdminHandler.RUNNING;
-
-/**
- *
- * This request handler implements the CDCR API and is responsible of the execution of the
- * {@link CdcrReplicator} threads.
- *
- *
- * It relies on three classes, {@link org.apache.solr.handler.CdcrLeaderStateManager},
- * {@link org.apache.solr.handler.CdcrBufferStateManager} and {@link org.apache.solr.handler.CdcrProcessStateManager}
- * to synchronise the state of the CDCR across all the nodes.
- *
- *
- * The CDCR process can be either {@link org.apache.solr.handler.CdcrParams.ProcessState#STOPPED} or {@link org.apache.solr.handler.CdcrParams.ProcessState#STARTED} by using the
- * actions {@link org.apache.solr.handler.CdcrParams.CdcrAction#STOP} and {@link org.apache.solr.handler.CdcrParams.CdcrAction#START} respectively. If a node is leader and the process
- * state is {@link org.apache.solr.handler.CdcrParams.ProcessState#STARTED}, the {@link CdcrReplicatorManager} will
- * start the {@link CdcrReplicator} threads. If a node becomes non-leader or if the process state becomes
- * {@link org.apache.solr.handler.CdcrParams.ProcessState#STOPPED}, the {@link CdcrReplicator} threads are stopped.
- *
- *
- * The CDCR can be switched to a "buffering" mode, in which the update log will never delete old transaction log
- * files. Such a mode can be enabled or disabled using the action {@link org.apache.solr.handler.CdcrParams.CdcrAction#ENABLEBUFFER} and
- * {@link org.apache.solr.handler.CdcrParams.CdcrAction#DISABLEBUFFER} respectively.
- *
- *
- * Known limitations: The source and target clusters must have the same topology. Replication between clusters
- * with a different number of shards will likely results in an inconsistent index.
- *
- * @deprecated since 8.6
- */
-@Deprecated(since = "8.6")
-public class CdcrRequestHandler extends RequestHandlerBase implements SolrCoreAware {
-
- private static final Logger log = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
-
- private SolrCore core;
- private String collection;
- private String shard;
- private String path;
-
- private SolrParams updateLogSynchronizerConfiguration;
- private SolrParams replicatorConfiguration;
- private SolrParams bufferConfiguration;
- private Map> replicasConfiguration;
-
- private CdcrProcessStateManager processStateManager;
- private CdcrBufferStateManager bufferStateManager;
- private CdcrReplicatorManager replicatorManager;
- private CdcrLeaderStateManager leaderStateManager;
- private CdcrUpdateLogSynchronizer updateLogSynchronizer;
- private CdcrBufferManager bufferManager;
-
- @Override
- public void init(@SuppressWarnings({"rawtypes"})NamedList args) {
- super.init(args);
-
- log.warn("CDCR (in its current form) is deprecated as of 8.6 and shall be removed in 9.0. See SOLR-14022 for details.");
-
- if (args != null) {
- // Configuration of the Update Log Synchronizer
- Object updateLogSynchonizerParam = args.get(CdcrParams.UPDATE_LOG_SYNCHRONIZER_PARAM);
- if (updateLogSynchonizerParam != null && updateLogSynchonizerParam instanceof NamedList) {
- updateLogSynchronizerConfiguration = ((NamedList) updateLogSynchonizerParam).toSolrParams();
- }
-
- // Configuration of the Replicator
- Object replicatorParam = args.get(CdcrParams.REPLICATOR_PARAM);
- if (replicatorParam != null && replicatorParam instanceof NamedList) {
- replicatorConfiguration = ((NamedList) replicatorParam).toSolrParams();
- }
-
- // Configuration of the Buffer
- Object bufferParam = args.get(CdcrParams.BUFFER_PARAM);
- if (bufferParam != null && bufferParam instanceof NamedList) {
- bufferConfiguration = ((NamedList) bufferParam).toSolrParams();
- }
-
- // Configuration of the Replicas
- replicasConfiguration = new HashMap<>();
- @SuppressWarnings({"rawtypes"})
- List replicas = args.getAll(CdcrParams.REPLICA_PARAM);
- for (Object replica : replicas) {
- if (replica != null && replica instanceof NamedList) {
- SolrParams params = ((NamedList) replica).toSolrParams();
- if (!replicasConfiguration.containsKey(params.get(CdcrParams.SOURCE_COLLECTION_PARAM))) {
- replicasConfiguration.put(params.get(CdcrParams.SOURCE_COLLECTION_PARAM), new ArrayList<>());
- }
- replicasConfiguration.get(params.get(CdcrParams.SOURCE_COLLECTION_PARAM)).add(params);
- }
- }
- }
- }
-
- @Override
- public void handleRequestBody(SolrQueryRequest req, SolrQueryResponse rsp) throws Exception {
- // Pick the action
- SolrParams params = req.getParams();
- CdcrParams.CdcrAction action = null;
- String a = params.get(CommonParams.ACTION);
- if (a != null) {
- action = CdcrParams.CdcrAction.get(a);
- }
- if (action == null) {
- throw new SolrException(SolrException.ErrorCode.BAD_REQUEST, "Unknown action: " + a);
- }
-
- switch (action) {
- case START: {
- this.handleStartAction(req, rsp);
- break;
- }
- case STOP: {
- this.handleStopAction(req, rsp);
- break;
- }
- case STATUS: {
- this.handleStatusAction(req, rsp);
- break;
- }
- case COLLECTIONCHECKPOINT: {
- this.handleCollectionCheckpointAction(req, rsp);
- break;
- }
- case SHARDCHECKPOINT: {
- this.handleShardCheckpointAction(req, rsp);
- break;
- }
- case ENABLEBUFFER: {
- this.handleEnableBufferAction(req, rsp);
- break;
- }
- case DISABLEBUFFER: {
- this.handleDisableBufferAction(req, rsp);
- break;
- }
- case LASTPROCESSEDVERSION: {
- this.handleLastProcessedVersionAction(req, rsp);
- break;
- }
- case QUEUES: {
- this.handleQueuesAction(req, rsp);
- break;
- }
- case OPS: {
- this.handleOpsAction(req, rsp);
- break;
- }
- case ERRORS: {
- this.handleErrorsAction(req, rsp);
- break;
- }
- case BOOTSTRAP: {
- this.handleBootstrapAction(req, rsp);
- break;
- }
- case BOOTSTRAP_STATUS: {
- this.handleBootstrapStatus(req, rsp);
- break;
- }
- case CANCEL_BOOTSTRAP: {
- this.handleCancelBootstrap(req, rsp);
- break;
- }
- default: {
- throw new RuntimeException("Unknown action: " + action);
- }
- }
-
- rsp.setHttpCaching(false);
- }
-
- @Override
- public void inform(SolrCore core) {
- this.core = core;
- collection = core.getCoreDescriptor().getCloudDescriptor().getCollectionName();
- shard = core.getCoreDescriptor().getCloudDescriptor().getShardId();
-
- // Make sure that the core is ZKAware
- if (!core.getCoreContainer().isZooKeeperAware()) {
- throw new SolrException(SolrException.ErrorCode.SERVER_ERROR,
- "Solr instance is not running in SolrCloud mode.");
- }
-
- // Make sure that the core is using the CdcrUpdateLog implementation
- if (!(core.getUpdateHandler().getUpdateLog() instanceof CdcrUpdateLog)) {
- throw new SolrException(SolrException.ErrorCode.SERVER_ERROR,
- "Solr instance is not configured with the cdcr update log.");
- }
-
- // Find the registered path of the handler
- path = null;
- for (Map.Entry> entry : core.getRequestHandlers().getRegistry().entrySet()) {
- if (core.getRequestHandlers().isLoaded(entry.getKey()) && entry.getValue().get() == this) {
- path = entry.getKey();
- break;
- }
- }
- if (path == null) {
- throw new SolrException(SolrException.ErrorCode.SERVER_ERROR,
- "The CdcrRequestHandler is not registered with the current core.");
- }
- if (!path.startsWith("/")) {
- throw new SolrException(SolrException.ErrorCode.SERVER_ERROR,
- "The CdcrRequestHandler needs to be registered to a path. Typically this is '/cdcr'");
- }
-
- // Initialisation phase
- // If the Solr cloud is being initialised, each CDCR node will start up in its default state, i.e., STOPPED
- // and non-leader. The leader state will be updated later, when all the Solr cores have been loaded.
- // If the Solr cloud has already been initialised, and the core is reloaded (i.e., because a node died or a new node
- // is added to the cluster), the CDCR node will synchronise its state with the global CDCR state that is stored
- // in zookeeper.
-
- // Initialise the buffer state manager
- bufferStateManager = new CdcrBufferStateManager(core, bufferConfiguration);
- // Initialise the process state manager
- processStateManager = new CdcrProcessStateManager(core);
- // Initialise the leader state manager
- leaderStateManager = new CdcrLeaderStateManager(core);
-
- // Initialise the replicator states manager
- replicatorManager = new CdcrReplicatorManager(core, path, replicatorConfiguration, replicasConfiguration);
- replicatorManager.setProcessStateManager(processStateManager);
- replicatorManager.setLeaderStateManager(leaderStateManager);
- // we need to inform it of a state event since the process and leader state
- // may have been synchronised during the initialisation
- replicatorManager.stateUpdate();
-
- // Initialise the update log synchronizer
- updateLogSynchronizer = new CdcrUpdateLogSynchronizer(core, path, updateLogSynchronizerConfiguration);
- updateLogSynchronizer.setLeaderStateManager(leaderStateManager);
- // we need to inform it of a state event since the leader state
- // may have been synchronised during the initialisation
- updateLogSynchronizer.stateUpdate();
-
- // Initialise the buffer manager
- bufferManager = new CdcrBufferManager(core);
- bufferManager.setLeaderStateManager(leaderStateManager);
- bufferManager.setBufferStateManager(bufferStateManager);
- // we need to inform it of a state event since the leader state
- // may have been synchronised during the initialisation
- bufferManager.stateUpdate();
-
- // register the close hook
- this.registerCloseHook(core);
- }
-
- /**
- * register a close hook to properly shutdown the state manager and scheduler
- */
- private void registerCloseHook(SolrCore core) {
- core.addCloseHook(new CloseHook() {
-
- @Override
- public void preClose(SolrCore core) {
- log.info("Solr core is being closed - shutting down CDCR handler @ {}:{}", collection, shard);
-
- updateLogSynchronizer.shutdown();
- replicatorManager.shutdown();
- bufferStateManager.shutdown();
- processStateManager.shutdown();
- leaderStateManager.shutdown();
- }
-
- @Override
- public void postClose(SolrCore core) {
- }
-
- });
- }
-
- /**
- *
- * Update and synchronize the process state.
- *
- *
- * The process state manager must notify the replicator states manager of the change of state.
- *
- */
- private void handleStartAction(SolrQueryRequest req, SolrQueryResponse rsp) {
- if (processStateManager.getState() == CdcrParams.ProcessState.STOPPED) {
- processStateManager.setState(CdcrParams.ProcessState.STARTED);
- processStateManager.synchronize();
- }
-
- rsp.add(CdcrParams.CdcrAction.STATUS.toLower(), this.getStatus());
- }
-
- private void handleStopAction(SolrQueryRequest req, SolrQueryResponse rsp) {
- if (processStateManager.getState() == CdcrParams.ProcessState.STARTED) {
- processStateManager.setState(CdcrParams.ProcessState.STOPPED);
- processStateManager.synchronize();
- }
-
- rsp.add(CdcrParams.CdcrAction.STATUS.toLower(), this.getStatus());
- }
-
- private void handleStatusAction(SolrQueryRequest req, SolrQueryResponse rsp) {
- rsp.add(CdcrParams.CdcrAction.STATUS.toLower(), this.getStatus());
- }
-
- @SuppressWarnings({"unchecked", "rawtypes"})
- private NamedList getStatus() {
- NamedList status = new NamedList();
- status.add(CdcrParams.ProcessState.getParam(), processStateManager.getState().toLower());
- status.add(CdcrParams.BufferState.getParam(), bufferStateManager.getState().toLower());
- return status;
- }
-
- /**
- * This action is generally executed on the target cluster in order to retrieve the latest update checkpoint.
- * This checkpoint is used on the source cluster to setup the
- * {@link org.apache.solr.update.CdcrUpdateLog.CdcrLogReader} of a shard leader.
- * This method will execute in parallel one
- * {@link org.apache.solr.handler.CdcrParams.CdcrAction#SHARDCHECKPOINT} request per shard leader. It will
- * then pick the lowest version number as checkpoint. Picking the lowest amongst all shards will ensure that we do not
- * pick a checkpoint that is ahead of the source cluster. This can occur when other shard leaders are sending new
- * updates to the target cluster while we are currently instantiating the
- * {@link org.apache.solr.update.CdcrUpdateLog.CdcrLogReader}.
- * This solution only works in scenarios where the topology of the source and target clusters are identical.
- */
- private void handleCollectionCheckpointAction(SolrQueryRequest req, SolrQueryResponse rsp)
- throws IOException, SolrServerException {
- ZkController zkController = core.getCoreContainer().getZkController();
- try {
- zkController.getZkStateReader().forceUpdateCollection(collection);
- } catch (Exception e) {
- log.warn("Error when updating cluster state", e);
- }
- ClusterState cstate = zkController.getClusterState();
- DocCollection docCollection = cstate.getCollectionOrNull(collection);
- Collection shards = docCollection == null? null : docCollection.getActiveSlices();
-
- ExecutorService parallelExecutor = ExecutorUtil.newMDCAwareCachedThreadPool(new SolrNamedThreadFactory("parallelCdcrExecutor"));
-
- long checkpoint = Long.MAX_VALUE;
- try {
- List> callables = new ArrayList<>();
- for (Slice shard : shards) {
- ZkNodeProps leaderProps = zkController.getZkStateReader().getLeaderRetry(collection, shard.getName());
- ZkCoreNodeProps nodeProps = new ZkCoreNodeProps(leaderProps);
- callables.add(new SliceCheckpointCallable(nodeProps.getCoreUrl(), path));
- }
-
- for (final Future future : parallelExecutor.invokeAll(callables)) {
- long version = future.get();
- if (version < checkpoint) { // we must take the lowest checkpoint from all the shards
- checkpoint = version;
- }
- }
- } catch (InterruptedException e) {
- Thread.currentThread().interrupt();
- throw new SolrException(SolrException.ErrorCode.SERVER_ERROR,
- "Error while requesting shard's checkpoints", e);
- } catch (ExecutionException e) {
- throw new SolrException(SolrException.ErrorCode.SERVER_ERROR,
- "Error while requesting shard's checkpoints", e);
- } finally {
- parallelExecutor.shutdown();
- }
-
- rsp.add(CdcrParams.CHECKPOINT, checkpoint);
- }
-
- /**
- * Retrieve the version number of the latest entry of the {@link org.apache.solr.update.UpdateLog}.
- */
- private void handleShardCheckpointAction(SolrQueryRequest req, SolrQueryResponse rsp) {
- if (!leaderStateManager.amILeader()) {
- throw new SolrException(SolrException.ErrorCode.BAD_REQUEST, "Action '" + CdcrParams.CdcrAction.SHARDCHECKPOINT +
- "' sent to non-leader replica");
- }
-
- UpdateLog ulog = core.getUpdateHandler().getUpdateLog();
- VersionInfo versionInfo = ulog.getVersionInfo();
- try (UpdateLog.RecentUpdates recentUpdates = ulog.getRecentUpdates()) {
- long maxVersionFromRecent = recentUpdates.getMaxRecentVersion();
- long maxVersionFromIndex = versionInfo.getMaxVersionFromIndex(req.getSearcher());
- log.info("Found maxVersionFromRecent {} maxVersionFromIndex {}", maxVersionFromRecent, maxVersionFromIndex);
- // there is no race with ongoing bootstrap because we don't expect any updates to come from the source
- long maxVersion = Math.max(maxVersionFromIndex, maxVersionFromRecent);
- if (maxVersion == 0L) {
- maxVersion = -1;
- }
- rsp.add(CdcrParams.CHECKPOINT, maxVersion);
- } catch (IOException e) {
- throw new SolrException(SolrException.ErrorCode.SERVER_ERROR, "Action '" + CdcrParams.CdcrAction.SHARDCHECKPOINT +
- "' could not read max version");
- }
- }
-
- private void handleEnableBufferAction(SolrQueryRequest req, SolrQueryResponse rsp) {
- if (bufferStateManager.getState() == CdcrParams.BufferState.DISABLED) {
- bufferStateManager.setState(CdcrParams.BufferState.ENABLED);
- bufferStateManager.synchronize();
- }
-
- rsp.add(CdcrParams.CdcrAction.STATUS.toLower(), this.getStatus());
- }
-
- private void handleDisableBufferAction(SolrQueryRequest req, SolrQueryResponse rsp) {
- if (bufferStateManager.getState() == CdcrParams.BufferState.ENABLED) {
- bufferStateManager.setState(CdcrParams.BufferState.DISABLED);
- bufferStateManager.synchronize();
- }
-
- rsp.add(CdcrParams.CdcrAction.STATUS.toLower(), this.getStatus());
- }
-
- /**
- *
- * We have to take care of four cases:
- *
- *
Replication & Buffering
- *
Replication & No Buffering
- *
No Replication & Buffering
- *
No Replication & No Buffering
- *
- * In the first three cases, at least one log reader should have been initialised. We should take the lowest
- * last processed version across all the initialised readers. In the last case, there isn't a log reader
- * initialised. We should instantiate one and get the version of the first entries.
- *
- */
- private void handleLastProcessedVersionAction(SolrQueryRequest req, SolrQueryResponse rsp) {
- String collectionName = core.getCoreDescriptor().getCloudDescriptor().getCollectionName();
- String shard = core.getCoreDescriptor().getCloudDescriptor().getShardId();
-
- if (!leaderStateManager.amILeader()) {
- log.warn("Action {} sent to non-leader replica @ {}:{}", CdcrParams.CdcrAction.LASTPROCESSEDVERSION, collectionName, shard);
- throw new SolrException(SolrException.ErrorCode.BAD_REQUEST, "Action " + CdcrParams.CdcrAction.LASTPROCESSEDVERSION +
- " sent to non-leader replica");
- }
-
- // take care of the first three cases
- // first check the log readers from the replicator states
- long lastProcessedVersion = Long.MAX_VALUE;
- for (CdcrReplicatorState state : replicatorManager.getReplicatorStates()) {
- long version = Long.MAX_VALUE;
- if (state.getLogReader() != null) {
- version = state.getLogReader().getLastVersion();
- }
- lastProcessedVersion = Math.min(lastProcessedVersion, version);
- }
-
- // next check the log reader of the buffer
- CdcrUpdateLog.CdcrLogReader bufferLogReader = ((CdcrUpdateLog) core.getUpdateHandler().getUpdateLog()).getBufferToggle();
- if (bufferLogReader != null) {
- lastProcessedVersion = Math.min(lastProcessedVersion, bufferLogReader.getLastVersion());
- }
-
- // the fourth case: no cdc replication, no buffering: all readers were null
- if (processStateManager.getState().equals(CdcrParams.ProcessState.STOPPED) &&
- bufferStateManager.getState().equals(CdcrParams.BufferState.DISABLED)) {
- CdcrUpdateLog.CdcrLogReader logReader = ((CdcrUpdateLog) core.getUpdateHandler().getUpdateLog()).newLogReader();
- try {
- // let the reader initialize lastVersion
- logReader.next();
- lastProcessedVersion = Math.min(lastProcessedVersion, logReader.getLastVersion());
- } catch (InterruptedException e) {
- Thread.currentThread().interrupt();
- throw new SolrException(SolrException.ErrorCode.SERVER_ERROR,
- "Error while fetching the last processed version", e);
- } catch (IOException e) {
- throw new SolrException(SolrException.ErrorCode.SERVER_ERROR,
- "Error while fetching the last processed version", e);
- } finally {
- logReader.close();
- }
- }
-
- log.debug("Returning the lowest last processed version {} @ {}:{}", lastProcessedVersion, collectionName, shard);
- rsp.add(CdcrParams.LAST_PROCESSED_VERSION, lastProcessedVersion);
- }
-
- @SuppressWarnings({"unchecked", "rawtypes"})
- private void handleQueuesAction(SolrQueryRequest req, SolrQueryResponse rsp) {
- NamedList hosts = new NamedList();
-
- for (CdcrReplicatorState state : replicatorManager.getReplicatorStates()) {
- NamedList queueStats = new NamedList();
-
- CdcrUpdateLog.CdcrLogReader logReader = state.getLogReader();
- if (logReader == null) {
- String collectionName = req.getCore().getCoreDescriptor().getCloudDescriptor().getCollectionName();
- String shard = req.getCore().getCoreDescriptor().getCloudDescriptor().getShardId();
- log.warn("The log reader for target collection {} is not initialised @ {}:{}",
- state.getTargetCollection(), collectionName, shard);
- queueStats.add(CdcrParams.QUEUE_SIZE, -1l);
- } else {
- queueStats.add(CdcrParams.QUEUE_SIZE, logReader.getNumberOfRemainingRecords());
- }
- queueStats.add(CdcrParams.LAST_TIMESTAMP, state.getTimestampOfLastProcessedOperation());
-
- if (hosts.get(state.getZkHost()) == null) {
- hosts.add(state.getZkHost(), new NamedList());
- }
- ((NamedList) hosts.get(state.getZkHost())).add(state.getTargetCollection(), queueStats);
- }
-
- rsp.add(CdcrParams.QUEUES, hosts);
- UpdateLog updateLog = core.getUpdateHandler().getUpdateLog();
- rsp.add(CdcrParams.TLOG_TOTAL_SIZE, updateLog.getTotalLogsSize());
- rsp.add(CdcrParams.TLOG_TOTAL_COUNT, updateLog.getTotalLogsNumber());
- rsp.add(CdcrParams.UPDATE_LOG_SYNCHRONIZER,
- updateLogSynchronizer.isStarted() ? CdcrParams.ProcessState.STARTED.toLower() : CdcrParams.ProcessState.STOPPED.toLower());
- }
-
- @SuppressWarnings({"unchecked", "rawtypes"})
- private void handleOpsAction(SolrQueryRequest req, SolrQueryResponse rsp) {
- NamedList hosts = new NamedList();
-
- for (CdcrReplicatorState state : replicatorManager.getReplicatorStates()) {
- NamedList ops = new NamedList();
- ops.add(CdcrParams.COUNTER_ALL, state.getBenchmarkTimer().getOperationsPerSecond());
- ops.add(CdcrParams.COUNTER_ADDS, state.getBenchmarkTimer().getAddsPerSecond());
- ops.add(CdcrParams.COUNTER_DELETES, state.getBenchmarkTimer().getDeletesPerSecond());
-
- if (hosts.get(state.getZkHost()) == null) {
- hosts.add(state.getZkHost(), new NamedList());
- }
- ((NamedList) hosts.get(state.getZkHost())).add(state.getTargetCollection(), ops);
- }
-
- rsp.add(CdcrParams.OPERATIONS_PER_SECOND, hosts);
- }
-
- @SuppressWarnings({"unchecked", "rawtypes"})
- private void handleErrorsAction(SolrQueryRequest req, SolrQueryResponse rsp) {
- NamedList hosts = new NamedList();
-
- for (CdcrReplicatorState state : replicatorManager.getReplicatorStates()) {
- NamedList errors = new NamedList();
-
- errors.add(CdcrParams.CONSECUTIVE_ERRORS, state.getConsecutiveErrors());
- errors.add(CdcrReplicatorState.ErrorType.BAD_REQUEST.toLower(), state.getErrorCount(CdcrReplicatorState.ErrorType.BAD_REQUEST));
- errors.add(CdcrReplicatorState.ErrorType.INTERNAL.toLower(), state.getErrorCount(CdcrReplicatorState.ErrorType.INTERNAL));
-
- NamedList lastErrors = new NamedList();
- for (String[] lastError : state.getLastErrors()) {
- lastErrors.add(lastError[0], lastError[1]);
- }
- errors.add(CdcrParams.LAST, lastErrors);
-
- if (hosts.get(state.getZkHost()) == null) {
- hosts.add(state.getZkHost(), new NamedList());
- }
- ((NamedList) hosts.get(state.getZkHost())).add(state.getTargetCollection(), errors);
- }
-
- rsp.add(CdcrParams.ERRORS, hosts);
- }
-
- private void handleBootstrapAction(SolrQueryRequest req, SolrQueryResponse rsp) throws IOException, InterruptedException, SolrServerException {
- String collectionName = core.getCoreDescriptor().getCloudDescriptor().getCollectionName();
- String shard = core.getCoreDescriptor().getCloudDescriptor().getShardId();
- if (!leaderStateManager.amILeader()) {
- log.warn("Action {} sent to non-leader replica @ {}:{}", CdcrParams.CdcrAction.BOOTSTRAP, collectionName, shard);
- throw new SolrException(SolrException.ErrorCode.BAD_REQUEST, "Action " + CdcrParams.CdcrAction.BOOTSTRAP +
- " sent to non-leader replica");
- }
- CountDownLatch latch = new CountDownLatch(1); // latch to make sure BOOTSTRAP_STATUS gives correct response
-
- Runnable runnable = () -> {
- Lock recoveryLock = req.getCore().getSolrCoreState().getRecoveryLock();
- boolean locked = recoveryLock.tryLock();
- SolrCoreState coreState = core.getSolrCoreState();
- try {
- if (!locked) {
- handleCancelBootstrap(req, rsp);
- } else if (leaderStateManager.amILeader()) {
- coreState.setCdcrBootstrapRunning(true);
- latch.countDown(); // free the latch as current bootstrap is executing
- //running.set(true);
- String leaderUrl = ReplicationHandler.getObjectWithBackwardCompatibility(req.getParams(), ReplicationHandler.LEADER_URL, ReplicationHandler.LEGACY_LEADER_URL, null);
- BootstrapCallable bootstrapCallable = new BootstrapCallable(leaderUrl, core);
- coreState.setCdcrBootstrapCallable(bootstrapCallable);
- Future bootstrapFuture = core.getCoreContainer().getUpdateShardHandler().getRecoveryExecutor()
- .submit(bootstrapCallable);
- coreState.setCdcrBootstrapFuture(bootstrapFuture);
- try {
- bootstrapFuture.get();
- } catch (InterruptedException e) {
- Thread.currentThread().interrupt();
- log.warn("Bootstrap was interrupted", e);
- } catch (ExecutionException e) {
- log.error("Bootstrap operation failed", e);
- }
- } else {
- log.error("Action {} sent to non-leader replica @ {}:{}. Aborting bootstrap.", CdcrParams.CdcrAction.BOOTSTRAP, collectionName, shard);
- }
- } finally {
- if (locked) {
- coreState.setCdcrBootstrapRunning(false);
- recoveryLock.unlock();
- } else {
- latch.countDown(); // free the latch as current bootstrap is executing
- }
- }
- };
-
- try {
- core.getCoreContainer().getUpdateShardHandler().getUpdateExecutor().submit(runnable);
- rsp.add(RESPONSE_STATUS, "submitted");
- latch.await(10000, TimeUnit.MILLISECONDS); // put the latch for current bootstrap command
- } catch (RejectedExecutionException ree) {
- // no problem, we're probably shutting down
- rsp.add(RESPONSE_STATUS, "failed");
- }
- }
-
- private void handleCancelBootstrap(SolrQueryRequest req, SolrQueryResponse rsp) {
- BootstrapCallable callable = (BootstrapCallable)core.getSolrCoreState().getCdcrBootstrapCallable();
- IOUtils.closeQuietly(callable);
- rsp.add(RESPONSE_STATUS, "cancelled");
- }
-
- private void handleBootstrapStatus(SolrQueryRequest req, SolrQueryResponse rsp) throws IOException, SolrServerException {
- SolrCoreState coreState = core.getSolrCoreState();
- if (coreState.getCdcrBootstrapRunning()) {
- rsp.add(RESPONSE_STATUS, RUNNING);
- return;
- }
-
- Future future = coreState.getCdcrBootstrapFuture();
- BootstrapCallable callable = (BootstrapCallable)coreState.getCdcrBootstrapCallable();
- if (future == null) {
- rsp.add(RESPONSE_STATUS, "notfound");
- rsp.add(RESPONSE_MESSAGE, "No bootstrap found in running, completed or failed states");
- } else if (future.isCancelled() || callable.isClosed()) {
- rsp.add(RESPONSE_STATUS, "cancelled");
- } else if (future.isDone()) {
- // could be a normal termination or an exception
- try {
- Boolean result = future.get();
- if (result) {
- rsp.add(RESPONSE_STATUS, COMPLETED);
- } else {
- rsp.add(RESPONSE_STATUS, FAILED);
- }
- } catch (InterruptedException e) {
- // should not happen?
- } catch (ExecutionException e) {
- rsp.add(RESPONSE_STATUS, FAILED);
- rsp.add(RESPONSE, e);
- } catch (CancellationException ce) {
- rsp.add(RESPONSE_STATUS, FAILED);
- rsp.add(RESPONSE_MESSAGE, "Bootstrap was cancelled");
- }
- } else {
- rsp.add(RESPONSE_STATUS, RUNNING);
- }
- }
-
- static class BootstrapCallable implements Callable, Closeable {
- private final String leaderUrl;
- private final SolrCore core;
- private volatile boolean closed = false;
-
- BootstrapCallable(String leaderUrl, SolrCore core) {
- this.leaderUrl = leaderUrl;
- this.core = core;
- }
-
- @Override
- public void close() throws IOException {
- closed = true;
- SolrRequestHandler handler = core.getRequestHandler(ReplicationHandler.PATH);
- ReplicationHandler replicationHandler = (ReplicationHandler) handler;
- replicationHandler.abortFetch();
- }
-
- public boolean isClosed() {
- return closed;
- }
-
- @Override
- public Boolean call() throws Exception {
- boolean success = false;
- UpdateLog ulog = core.getUpdateHandler().getUpdateLog();
- // we start buffering updates as a safeguard however we do not expect
- // to receive any updates from the source during bootstrap
- ulog.bufferUpdates();
- try {
- commitOnLeader(leaderUrl);
- // use rep handler directly, so we can do this sync rather than async
- SolrRequestHandler handler = core.getRequestHandler(ReplicationHandler.PATH);
- ReplicationHandler replicationHandler = (ReplicationHandler) handler;
-
- if (replicationHandler == null) {
- throw new SolrException(SolrException.ErrorCode.SERVICE_UNAVAILABLE,
- "Skipping recovery, no " + ReplicationHandler.PATH + " handler found");
- }
-
- ModifiableSolrParams solrParams = new ModifiableSolrParams();
- solrParams.set(ReplicationHandler.LEADER_URL, leaderUrl);
- // we do not want the raw tlog files from the source
- solrParams.set(ReplicationHandler.TLOG_FILES, false);
-
- success = replicationHandler.doFetch(solrParams, false).getSuccessful();
-
- Future future = ulog.applyBufferedUpdates();
- if (future == null) {
- // no replay needed
- log.info("No replay needed.");
- } else {
- log.info("Replaying buffered documents.");
- // wait for replay
- UpdateLog.RecoveryInfo report = future.get();
- if (report.failed) {
- SolrException.log(log, "Replay failed");
- throw new SolrException(SolrException.ErrorCode.SERVER_ERROR, "Replay failed");
- }
- }
- if (success) {
- ZkController zkController = core.getCoreContainer().getZkController();
- String collectionName = core.getCoreDescriptor().getCollectionName();
- ClusterState clusterState = zkController.getZkStateReader().getClusterState();
- DocCollection collection = clusterState.getCollection(collectionName);
- Slice slice = collection.getSlice(core.getCoreDescriptor().getCloudDescriptor().getShardId());
- ZkShardTerms terms = zkController.getShardTerms(collectionName, slice.getName());
- String coreNodeName = core.getCoreDescriptor().getCloudDescriptor().getCoreNodeName();
- Set allExceptLeader = slice.getReplicas().stream().filter(replica -> !replica.getName().equals(coreNodeName)).map(Replica::getName).collect(Collectors.toSet());
- terms.ensureTermsIsHigher(coreNodeName, allExceptLeader);
- }
- return success;
- } finally {
- if (closed || !success) {
- // we cannot apply the buffer in this case because it will introduce newer versions in the
- // update log and then the source cluster will get those versions via collectioncheckpoint
- // causing the versions in between to be completely missed
- boolean dropped = ulog.dropBufferedUpdates();
- assert dropped;
- }
- }
- }
-
- private void commitOnLeader(String leaderUrl) throws SolrServerException,
- IOException {
- try (HttpSolrClient client = new HttpSolrClient.Builder(leaderUrl)
- .withConnectionTimeout(30000)
- .build()) {
- UpdateRequest ureq = new UpdateRequest();
- ureq.setParams(new ModifiableSolrParams());
- ureq.getParams().set(DistributedUpdateProcessor.COMMIT_END_POINT, true);
- ureq.getParams().set(UpdateParams.OPEN_SEARCHER, false);
- ureq.setAction(AbstractUpdateRequest.ACTION.COMMIT, false, true).process(
- client);
- }
- }
- }
-
- @Override
- public String getDescription() {
- return "Manage Cross Data Center Replication";
- }
-
- @Override
- public Category getCategory() {
- return Category.REPLICATION;
- }
-
- /**
- * A thread subclass for executing a single
- * {@link org.apache.solr.handler.CdcrParams.CdcrAction#SHARDCHECKPOINT} action.
- */
- private static final class SliceCheckpointCallable implements Callable {
-
- final String baseUrl;
- final String cdcrPath;
-
- SliceCheckpointCallable(final String baseUrl, final String cdcrPath) {
- this.baseUrl = baseUrl;
- this.cdcrPath = cdcrPath;
- }
-
- @Override
- public Long call() throws Exception {
- try (HttpSolrClient server = new HttpSolrClient.Builder(baseUrl)
- .withConnectionTimeout(15000)
- .withSocketTimeout(60000)
- .build()) {
-
- ModifiableSolrParams params = new ModifiableSolrParams();
- params.set(CommonParams.ACTION, CdcrParams.CdcrAction.SHARDCHECKPOINT.toString());
-
- @SuppressWarnings({"rawtypes"})
- SolrRequest request = new QueryRequest(params);
- request.setPath(cdcrPath);
-
- @SuppressWarnings({"rawtypes"})
- NamedList response = server.request(request);
- return (Long) response.get(CdcrParams.CHECKPOINT);
- }
- }
-
- }
-
-}
-
diff --git a/solr/core/src/java/org/apache/solr/handler/CdcrStateManager.java b/solr/core/src/java/org/apache/solr/handler/CdcrStateManager.java
deleted file mode 100644
index 151615e9e4f..00000000000
--- a/solr/core/src/java/org/apache/solr/handler/CdcrStateManager.java
+++ /dev/null
@@ -1,47 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.solr.handler;
-
-import java.util.ArrayList;
-import java.util.List;
-
-/**
- * A state manager which implements an observer pattern to notify observers
- * of a state change.
- */
-abstract class CdcrStateManager {
-
- private List observers = new ArrayList<>();
-
- void register(CdcrStateObserver observer) {
- this.observers.add(observer);
- }
-
- void callback() {
- for (CdcrStateObserver observer : observers) {
- observer.stateUpdate();
- }
- }
-
- interface CdcrStateObserver {
-
- void stateUpdate();
-
- }
-
-}
-
diff --git a/solr/core/src/java/org/apache/solr/handler/CdcrUpdateLogSynchronizer.java b/solr/core/src/java/org/apache/solr/handler/CdcrUpdateLogSynchronizer.java
deleted file mode 100644
index 52590ee254e..00000000000
--- a/solr/core/src/java/org/apache/solr/handler/CdcrUpdateLogSynchronizer.java
+++ /dev/null
@@ -1,192 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.solr.handler;
-
-import java.io.IOException;
-import java.lang.invoke.MethodHandles;
-import java.util.concurrent.Executors;
-import java.util.concurrent.ScheduledExecutorService;
-import java.util.concurrent.TimeUnit;
-
-import org.apache.solr.client.solrj.SolrRequest;
-import org.apache.solr.client.solrj.SolrServerException;
-import org.apache.solr.client.solrj.impl.HttpSolrClient;
-import org.apache.solr.client.solrj.request.QueryRequest;
-import org.apache.solr.cloud.ZkController;
-import org.apache.solr.common.cloud.ClusterState;
-import org.apache.solr.common.cloud.DocCollection;
-import org.apache.solr.common.cloud.ZkCoreNodeProps;
-import org.apache.solr.common.cloud.ZkNodeProps;
-import org.apache.solr.common.params.CommonParams;
-import org.apache.solr.common.params.ModifiableSolrParams;
-import org.apache.solr.common.params.SolrParams;
-import org.apache.solr.common.util.NamedList;
-import org.apache.solr.core.SolrCore;
-import org.apache.solr.update.CdcrUpdateLog;
-import org.apache.solr.common.util.SolrNamedThreadFactory;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-/**
- *
- * Synchronize periodically the update log of non-leader nodes with their leaders.
- *
- *
- * Non-leader nodes must always buffer updates in case of leader failures. They have to periodically
- * synchronize their update logs with their leader to remove old transaction logs that will never be used anymore.
- * This is performed by a background thread that is scheduled with a fixed delay. The background thread is sending
- * the action {@link org.apache.solr.handler.CdcrParams.CdcrAction#LASTPROCESSEDVERSION} to the leader to retrieve
- * the lowest last version number processed. This version is then used to move forward the buffer log reader.
- *
- */
-class CdcrUpdateLogSynchronizer implements CdcrStateManager.CdcrStateObserver {
-
- private CdcrLeaderStateManager leaderStateManager;
- private ScheduledExecutorService scheduler;
-
- private final SolrCore core;
- private final String collection;
- private final String shardId;
- private final String path;
-
- private int timeSchedule = DEFAULT_TIME_SCHEDULE;
-
- private static final int DEFAULT_TIME_SCHEDULE = 60000; // by default, every minute
-
- private static final Logger log = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
-
- CdcrUpdateLogSynchronizer(SolrCore core, String path, SolrParams updateLogSynchonizerConfiguration) {
- this.core = core;
- this.path = path;
- this.collection = core.getCoreDescriptor().getCloudDescriptor().getCollectionName();
- this.shardId = core.getCoreDescriptor().getCloudDescriptor().getShardId();
- if (updateLogSynchonizerConfiguration != null) {
- this.timeSchedule = updateLogSynchonizerConfiguration.getInt(CdcrParams.SCHEDULE_PARAM, DEFAULT_TIME_SCHEDULE);
- }
- }
-
- void setLeaderStateManager(final CdcrLeaderStateManager leaderStateManager) {
- this.leaderStateManager = leaderStateManager;
- this.leaderStateManager.register(this);
- }
-
- @Override
- public void stateUpdate() {
- // If I am not the leader, I need to synchronise periodically my update log with my leader.
- if (!leaderStateManager.amILeader()) {
- scheduler = Executors.newSingleThreadScheduledExecutor(new SolrNamedThreadFactory("cdcr-update-log-synchronizer"));
- scheduler.scheduleWithFixedDelay(new UpdateLogSynchronisation(), 0, timeSchedule, TimeUnit.MILLISECONDS);
- return;
- }
-
- this.shutdown();
- }
-
- boolean isStarted() {
- return scheduler != null;
- }
-
- void shutdown() {
- if (scheduler != null) {
- // interrupts are often dangerous in Lucene / Solr code, but the
- // test for this will leak threads without
- scheduler.shutdownNow();
- scheduler = null;
- }
- }
-
- private class UpdateLogSynchronisation implements Runnable {
-
- private String getLeaderUrl() {
- ZkController zkController = core.getCoreContainer().getZkController();
- ClusterState cstate = zkController.getClusterState();
- DocCollection docCollection = cstate.getCollection(collection);
- ZkNodeProps leaderProps = docCollection.getLeader(shardId);
- if (leaderProps == null) { // we might not have a leader yet, returns null
- return null;
- }
- ZkCoreNodeProps nodeProps = new ZkCoreNodeProps(leaderProps);
- return nodeProps.getCoreUrl();
- }
-
- @Override
- public void run() {
- try {
- String leaderUrl = getLeaderUrl();
- if (leaderUrl == null) { // we might not have a leader yet, stop and try again later
- return;
- }
-
- HttpSolrClient server = new HttpSolrClient.Builder(leaderUrl)
- .withConnectionTimeout(15000)
- .withSocketTimeout(60000)
- .build();
-
- ModifiableSolrParams params = new ModifiableSolrParams();
- params.set(CommonParams.ACTION, CdcrParams.CdcrAction.LASTPROCESSEDVERSION.toString());
-
- @SuppressWarnings({"rawtypes"})
- SolrRequest request = new QueryRequest(params);
- request.setPath(path);
-
- long lastVersion;
- try {
- @SuppressWarnings({"rawtypes"})
- NamedList response = server.request(request);
- lastVersion = (Long) response.get(CdcrParams.LAST_PROCESSED_VERSION);
- if (log.isDebugEnabled()) {
- log.debug("My leader {} says its last processed _version_ number is: {}. I am {}", leaderUrl, lastVersion,
- core.getCoreDescriptor().getCloudDescriptor().getCoreNodeName());
- }
- } catch (IOException | SolrServerException e) {
- log.warn("Couldn't get last processed version from leader {}: ", leaderUrl, e);
- return;
- } finally {
- try {
- server.close();
- } catch (IOException ioe) {
- log.warn("Caught exception trying to close client to {}: ", leaderUrl, ioe);
- }
- }
-
- // if we received -1, it means that the log reader on the leader has not yet started to read log entries
- // do nothing
- if (lastVersion == -1) {
- return;
- }
-
- try {
- CdcrUpdateLog ulog = (CdcrUpdateLog) core.getUpdateHandler().getUpdateLog();
- if (ulog.isBuffering()) {
- log.debug("Advancing replica buffering tlog reader to {} @ {}:{}", lastVersion, collection, shardId);
- ulog.getBufferToggle().seek(lastVersion);
- }
- } catch (InterruptedException e) {
- Thread.currentThread().interrupt();
- log.warn("Couldn't advance replica buffering tlog reader to {} (to remove old tlogs): ", lastVersion, e);
- } catch (IOException e) {
- log.warn("Couldn't advance replica buffering tlog reader to {} (to remove old tlogs): ", lastVersion, e);
- }
- } catch (Throwable e) {
- log.warn("Caught unexpected exception", e);
- throw e;
- }
- }
- }
-
-}
-
diff --git a/solr/core/src/java/org/apache/solr/handler/IndexFetcher.java b/solr/core/src/java/org/apache/solr/handler/IndexFetcher.java
index 57f8ba08554..2bd20454dbe 100644
--- a/solr/core/src/java/org/apache/solr/handler/IndexFetcher.java
+++ b/solr/core/src/java/org/apache/solr/handler/IndexFetcher.java
@@ -93,10 +93,7 @@ import org.apache.solr.core.SolrCore;
import org.apache.solr.request.LocalSolrQueryRequest;
import org.apache.solr.request.SolrQueryRequest;
import org.apache.solr.search.SolrIndexSearcher;
-import org.apache.solr.update.CdcrUpdateLog;
import org.apache.solr.update.CommitUpdateCommand;
-import org.apache.solr.update.UpdateLog;
-import org.apache.solr.update.VersionInfo;
import org.apache.solr.common.util.SolrNamedThreadFactory;
import org.apache.solr.util.FileUtils;
import org.apache.solr.util.PropertiesOutputStream;
@@ -137,14 +134,10 @@ public class IndexFetcher {
private volatile List