SOLR-13995: Move ZkShardTerms.Terms to SolrJ

This commit is contained in:
noble 2019-12-03 15:16:34 +11:00
parent d8f9f47ca0
commit a51c7b89f2
4 changed files with 286 additions and 235 deletions

View File

@ -20,6 +20,7 @@ package org.apache.solr.cloud;
import java.lang.invoke.MethodHandles; import java.lang.invoke.MethodHandles;
import java.util.concurrent.atomic.AtomicLong; import java.util.concurrent.atomic.AtomicLong;
import org.apache.solr.client.solrj.cloud.ShardTerms;
import org.apache.solr.core.CoreContainer; import org.apache.solr.core.CoreContainer;
import org.apache.solr.core.CoreDescriptor; import org.apache.solr.core.CoreDescriptor;
import org.apache.solr.core.SolrCore; import org.apache.solr.core.SolrCore;
@ -44,7 +45,7 @@ public class RecoveringCoreTermWatcher implements ZkShardTerms.CoreTermWatcher {
} }
@Override @Override
public boolean onTermChanged(ZkShardTerms.Terms terms) { public boolean onTermChanged(ShardTerms terms) {
if (coreContainer.isShutDown()) return false; if (coreContainer.isShutDown()) return false;
try (SolrCore solrCore = coreContainer.getCore(coreDescriptor.getName())) { try (SolrCore solrCore = coreContainer.getCore(coreDescriptor.getName())) {

View File

@ -18,15 +18,14 @@
package org.apache.solr.cloud; package org.apache.solr.cloud;
import java.lang.invoke.MethodHandles; import java.lang.invoke.MethodHandles;
import java.util.Collections;
import java.util.HashMap; import java.util.HashMap;
import java.util.HashSet; import java.util.HashSet;
import java.util.Map; import java.util.Map;
import java.util.Objects;
import java.util.Set; import java.util.Set;
import java.util.concurrent.TimeoutException; import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicBoolean;
import org.apache.solr.client.solrj.cloud.ShardTerms;
import org.apache.solr.common.SolrException; import org.apache.solr.common.SolrException;
import org.apache.solr.common.cloud.SolrZkClient; import org.apache.solr.common.cloud.SolrZkClient;
import org.apache.solr.common.cloud.ZkStateReader; import org.apache.solr.common.cloud.ZkStateReader;
@ -74,14 +73,12 @@ public class ZkShardTerms implements AutoCloseable{
private final Set<CoreTermWatcher> listeners = new HashSet<>(); private final Set<CoreTermWatcher> listeners = new HashSet<>();
private final AtomicBoolean isClosed = new AtomicBoolean(false); private final AtomicBoolean isClosed = new AtomicBoolean(false);
private static final String RECOVERING_TERM_SUFFIX = "_recovering"; private ShardTerms terms;
private Terms terms;
// Listener of a core for shard's term change events // Listener of a core for shard's term change events
interface CoreTermWatcher { interface CoreTermWatcher {
// return true if the listener wanna to be triggered in the next time // return true if the listener wanna to be triggered in the next time
boolean onTermChanged(Terms terms); boolean onTermChanged(ShardTerms terms);
} }
public ZkShardTerms(String collection, String shard, SolrZkClient zkClient) { public ZkShardTerms(String collection, String shard, SolrZkClient zkClient) {
@ -103,12 +100,15 @@ public class ZkShardTerms implements AutoCloseable{
public void ensureTermsIsHigher(String leader, Set<String> replicasNeedingRecovery) { public void ensureTermsIsHigher(String leader, Set<String> replicasNeedingRecovery) {
if (replicasNeedingRecovery.isEmpty()) return; if (replicasNeedingRecovery.isEmpty()) return;
Terms newTerms; ShardTerms newTerms;
while( (newTerms = terms.increaseTerms(leader, replicasNeedingRecovery)) != null) { while( (newTerms = terms.increaseTerms(leader, replicasNeedingRecovery)) != null) {
if (forceSaveTerms(newTerms)) return; if (forceSaveTerms(newTerms)) return;
} }
} }
public ShardTerms getShardTerms() {
return terms;
}
/** /**
* Can this replica become leader? * Can this replica become leader?
* @param coreNodeName of the replica * @param coreNodeName of the replica
@ -148,7 +148,7 @@ public class ZkShardTerms implements AutoCloseable{
// package private for testing, only used by tests // package private for testing, only used by tests
Map<String, Long> getTerms() { Map<String, Long> getTerms() {
synchronized (writingLock) { synchronized (writingLock) {
return new HashMap<>(terms.values); return terms.getTerms();
} }
} }
@ -178,7 +178,7 @@ public class ZkShardTerms implements AutoCloseable{
// package private for testing, only used by tests // package private for testing, only used by tests
// return true if this object should not be reused // return true if this object should not be reused
boolean removeTerm(String coreNodeName) { boolean removeTerm(String coreNodeName) {
Terms newTerms; ShardTerms newTerms;
while ( (newTerms = terms.removeTerm(coreNodeName)) != null) { while ( (newTerms = terms.removeTerm(coreNodeName)) != null) {
try { try {
if (saveTerms(newTerms)) return false; if (saveTerms(newTerms)) return false;
@ -195,7 +195,7 @@ public class ZkShardTerms implements AutoCloseable{
* @param coreNodeName of the replica * @param coreNodeName of the replica
*/ */
void registerTerm(String coreNodeName) { void registerTerm(String coreNodeName) {
Terms newTerms; ShardTerms newTerms;
while ( (newTerms = terms.registerTerm(coreNodeName)) != null) { while ( (newTerms = terms.registerTerm(coreNodeName)) != null) {
if (forceSaveTerms(newTerms)) break; if (forceSaveTerms(newTerms)) break;
} }
@ -207,14 +207,14 @@ public class ZkShardTerms implements AutoCloseable{
* @param coreNodeName of the replica * @param coreNodeName of the replica
*/ */
public void setTermEqualsToLeader(String coreNodeName) { public void setTermEqualsToLeader(String coreNodeName) {
Terms newTerms; ShardTerms newTerms;
while ( (newTerms = terms.setTermEqualsToLeader(coreNodeName)) != null) { while ( (newTerms = terms.setTermEqualsToLeader(coreNodeName)) != null) {
if (forceSaveTerms(newTerms)) break; if (forceSaveTerms(newTerms)) break;
} }
} }
public void setTermToZero(String coreNodeName) { public void setTermToZero(String coreNodeName) {
Terms newTerms; ShardTerms newTerms;
while ( (newTerms = terms.setTermToZero(coreNodeName)) != null) { while ( (newTerms = terms.setTermToZero(coreNodeName)) != null) {
if (forceSaveTerms(newTerms)) break; if (forceSaveTerms(newTerms)) break;
} }
@ -224,7 +224,7 @@ public class ZkShardTerms implements AutoCloseable{
* Mark {@code coreNodeName} as recovering * Mark {@code coreNodeName} as recovering
*/ */
public void startRecovering(String coreNodeName) { public void startRecovering(String coreNodeName) {
Terms newTerms; ShardTerms newTerms;
while ( (newTerms = terms.startRecovering(coreNodeName)) != null) { while ( (newTerms = terms.startRecovering(coreNodeName)) != null) {
if (forceSaveTerms(newTerms)) break; if (forceSaveTerms(newTerms)) break;
} }
@ -234,27 +234,22 @@ public class ZkShardTerms implements AutoCloseable{
* Mark {@code coreNodeName} as finished recovering * Mark {@code coreNodeName} as finished recovering
*/ */
public void doneRecovering(String coreNodeName) { public void doneRecovering(String coreNodeName) {
Terms newTerms; ShardTerms newTerms;
while ( (newTerms = terms.doneRecovering(coreNodeName)) != null) { while ( (newTerms = terms.doneRecovering(coreNodeName)) != null) {
if (forceSaveTerms(newTerms)) break; if (forceSaveTerms(newTerms)) break;
} }
} }
public boolean isRecovering(String name) { public boolean isRecovering(String name) {
return terms.values.containsKey(recoveringTerm(name)); return terms.isRecovering(name);
} }
public static String recoveringTerm(String coreNodeName) {
return coreNodeName + RECOVERING_TERM_SUFFIX;
}
/** /**
* When first updates come in, all replicas have some data now, * When first updates come in, all replicas have some data now,
* so we must switch from term 0 (registered) to 1 (have some data) * so we must switch from term 0 (registered) to 1 (have some data)
*/ */
public void ensureHighestTermsAreNotZero() { public void ensureHighestTermsAreNotZero() {
Terms newTerms; ShardTerms newTerms;
while ( (newTerms = terms.ensureHighestTermsAreNotZero()) != null) { while ( (newTerms = terms.ensureHighestTermsAreNotZero()) != null) {
if (forceSaveTerms(newTerms)) break; if (forceSaveTerms(newTerms)) break;
} }
@ -282,7 +277,7 @@ public class ZkShardTerms implements AutoCloseable{
* @param newTerms to be set * @param newTerms to be set
* @return true if terms is saved successfully to ZK, false if otherwise * @return true if terms is saved successfully to ZK, false if otherwise
*/ */
private boolean forceSaveTerms(Terms newTerms) { private boolean forceSaveTerms(ShardTerms newTerms) {
try { try {
return saveTerms(newTerms); return saveTerms(newTerms);
} catch (KeeperException.NoNodeException e) { } catch (KeeperException.NoNodeException e) {
@ -297,11 +292,11 @@ public class ZkShardTerms implements AutoCloseable{
* @return true if terms is saved successfully to ZK, false if otherwise * @return true if terms is saved successfully to ZK, false if otherwise
* @throws KeeperException.NoNodeException correspond ZK term node is not created * @throws KeeperException.NoNodeException correspond ZK term node is not created
*/ */
private boolean saveTerms(Terms newTerms) throws KeeperException.NoNodeException { private boolean saveTerms(ShardTerms newTerms) throws KeeperException.NoNodeException {
byte[] znodeData = Utils.toJSON(newTerms.values); byte[] znodeData = Utils.toJSON(newTerms);
try { try {
Stat stat = zkClient.setData(znodePath, znodeData, newTerms.version, true); Stat stat = zkClient.setData(znodePath, znodeData, newTerms.getVersion(), true);
setNewTerms(new Terms(newTerms.values, stat.getVersion())); setNewTerms(new ShardTerms(newTerms, stat.getVersion()));
log.info("Successful update of terms at {} to {}", znodePath, newTerms); log.info("Successful update of terms at {} to {}", znodePath, newTerms);
return true; return true;
} catch (KeeperException.BadVersionException e) { } catch (KeeperException.BadVersionException e) {
@ -344,11 +339,11 @@ public class ZkShardTerms implements AutoCloseable{
* Fetch latest terms from ZK * Fetch latest terms from ZK
*/ */
public void refreshTerms() { public void refreshTerms() {
Terms newTerms; ShardTerms newTerms;
try { try {
Stat stat = new Stat(); Stat stat = new Stat();
byte[] data = zkClient.getData(znodePath, null, stat, true); byte[] data = zkClient.getData(znodePath, null, stat, true);
newTerms = new Terms((Map<String, Long>) Utils.fromJSON(data), stat.getVersion()); newTerms = new ShardTerms((Map<String, Long>) Utils.fromJSON(data), stat.getVersion());
} catch (KeeperException e) { } catch (KeeperException e) {
Thread.interrupted(); Thread.interrupted();
throw new SolrException(SolrException.ErrorCode.SERVER_ERROR, "Error updating shard term for collection: " + collection, e); throw new SolrException(SolrException.ErrorCode.SERVER_ERROR, "Error updating shard term for collection: " + collection, e);
@ -411,10 +406,10 @@ public class ZkShardTerms implements AutoCloseable{
* Atomically update {@link ZkShardTerms#terms} and call listeners * Atomically update {@link ZkShardTerms#terms} and call listeners
* @param newTerms to be set * @param newTerms to be set
*/ */
private void setNewTerms(Terms newTerms) { private void setNewTerms(ShardTerms newTerms) {
boolean isChanged = false; boolean isChanged = false;
synchronized (writingLock) { synchronized (writingLock) {
if (terms == null || newTerms.version > terms.version) { if (terms == null || newTerms.getVersion() > terms.getVersion()) {
terms = newTerms; terms = newTerms;
isChanged = true; isChanged = true;
} }
@ -422,211 +417,9 @@ public class ZkShardTerms implements AutoCloseable{
if (isChanged) onTermUpdates(newTerms); if (isChanged) onTermUpdates(newTerms);
} }
private void onTermUpdates(Terms newTerms) { private void onTermUpdates(ShardTerms newTerms) {
synchronized (listeners) { synchronized (listeners) {
listeners.removeIf(coreTermWatcher -> !coreTermWatcher.onTermChanged(newTerms)); listeners.removeIf(coreTermWatcher -> !coreTermWatcher.onTermChanged(newTerms));
} }
} }
/**
* Hold values of terms, this class is immutable
*/
static class Terms {
private final Map<String, Long> values;
private final long maxTerm;
// ZK node version
private final int version;
public Terms () {
this(new HashMap<>(), 0);
}
public Terms(Map<String, Long> values, int version) {
this.values = values;
this.version = version;
if (values.isEmpty()) this.maxTerm = 0;
else this.maxTerm = Collections.max(values.values());
}
/**
* Can {@code coreNodeName} become leader?
* @param coreNodeName of the replica
* @return true if {@code coreNodeName} can become leader, false if otherwise
*/
boolean canBecomeLeader(String coreNodeName) {
return haveHighestTermValue(coreNodeName) && !values.containsKey(recoveringTerm(coreNodeName));
}
/**
* Is {@code coreNodeName}'s term highest?
* @param coreNodeName of the replica
* @return true if term of {@code coreNodeName} is highest
*/
boolean haveHighestTermValue(String coreNodeName) {
if (values.isEmpty()) return true;
long maxTerm = Collections.max(values.values());
return values.getOrDefault(coreNodeName, 0L) == maxTerm;
}
Long getTerm(String coreNodeName) {
return values.get(coreNodeName);
}
/**
* Return a new {@link Terms} in which term of {@code leader} is higher than {@code replicasNeedingRecovery}
* @param leader coreNodeName of leader
* @param replicasNeedingRecovery set of replicas in which their terms should be lower than leader's term
* @return null if term of {@code leader} is already higher than {@code replicasNeedingRecovery}
*/
Terms increaseTerms(String leader, Set<String> replicasNeedingRecovery) {
if (!values.containsKey(leader)) {
throw new SolrException(SolrException.ErrorCode.SERVER_ERROR, "Can not find leader's term " + leader);
}
boolean changed = false;
boolean foundReplicasInLowerTerms = false;
HashMap<String, Long> newValues = new HashMap<>(values);
long leaderTerm = newValues.get(leader);
for (Map.Entry<String, Long> entry : newValues.entrySet()) {
String key = entry.getKey();
if (replicasNeedingRecovery.contains(key)) foundReplicasInLowerTerms = true;
if (Objects.equals(entry.getValue(), leaderTerm)) {
if(skipIncreaseTermOf(key, replicasNeedingRecovery)) {
changed = true;
} else {
newValues.put(key, leaderTerm+1);
}
}
}
// We should skip the optimization if there are no replicasNeedingRecovery present in local terms,
// this may indicate that the current value is stale
if (!changed && foundReplicasInLowerTerms) return null;
return new Terms(newValues, version);
}
private boolean skipIncreaseTermOf(String key, Set<String> replicasNeedingRecovery) {
if (key.endsWith(RECOVERING_TERM_SUFFIX)) {
key = key.substring(0, key.length() - RECOVERING_TERM_SUFFIX.length());
}
return replicasNeedingRecovery.contains(key);
}
/**
* Return a new {@link Terms} in which highest terms are not zero
* @return null if highest terms are already larger than zero
*/
Terms ensureHighestTermsAreNotZero() {
if (maxTerm > 0) return null;
else {
HashMap<String, Long> newValues = new HashMap<>(values);
for (String replica : values.keySet()) {
newValues.put(replica, 1L);
}
return new Terms(newValues, version);
}
}
/**
* Return a new {@link Terms} in which terms for the {@code coreNodeName} are removed
* @param coreNodeName of the replica
* @return null if term of {@code coreNodeName} is already not exist
*/
Terms removeTerm(String coreNodeName) {
if (!values.containsKey(recoveringTerm(coreNodeName)) && !values.containsKey(coreNodeName)) {
return null;
}
HashMap<String, Long> newValues = new HashMap<>(values);
newValues.remove(coreNodeName);
newValues.remove(recoveringTerm(coreNodeName));
return new Terms(newValues, version);
}
/**
* Return a new {@link Terms} in which the associate term of {@code coreNodeName} is not null
* @param coreNodeName of the replica
* @return null if term of {@code coreNodeName} is already exist
*/
Terms registerTerm(String coreNodeName) {
if (values.containsKey(coreNodeName)) return null;
HashMap<String, Long> newValues = new HashMap<>(values);
newValues.put(coreNodeName, 0L);
return new Terms(newValues, version);
}
Terms setTermToZero(String coreNodeName) {
if (values.getOrDefault(coreNodeName, -1L) == 0) {
return null;
}
HashMap<String, Long> newValues = new HashMap<>(values);
newValues.put(coreNodeName, 0L);
return new Terms(newValues, version);
}
/**
* Return a new {@link Terms} in which the term of {@code coreNodeName} is max
* @param coreNodeName of the replica
* @return null if term of {@code coreNodeName} is already maximum
*/
Terms setTermEqualsToLeader(String coreNodeName) {
long maxTerm = getMaxTerm();
if (values.get(coreNodeName) == maxTerm) return null;
HashMap<String, Long> newValues = new HashMap<>(values);
newValues.put(coreNodeName, maxTerm);
newValues.remove(recoveringTerm(coreNodeName));
return new Terms(newValues, version);
}
long getMaxTerm() {
return maxTerm;
}
/**
* Mark {@code coreNodeName} as recovering
* @param coreNodeName of the replica
* @return null if {@code coreNodeName} is already marked as doing recovering
*/
Terms startRecovering(String coreNodeName) {
long maxTerm = getMaxTerm();
if (values.get(coreNodeName) == maxTerm)
return null;
HashMap<String, Long> newValues = new HashMap<>(values);
if (!newValues.containsKey(recoveringTerm(coreNodeName))) {
long currentTerm = newValues.getOrDefault(coreNodeName, 0L);
// by keeping old term, we will have more information in leader election
newValues.put(recoveringTerm(coreNodeName), currentTerm);
}
newValues.put(coreNodeName, maxTerm);
return new Terms(newValues, version);
}
/**
* Mark {@code coreNodeName} as finished recovering
* @param coreNodeName of the replica
* @return null if term of {@code coreNodeName} is already finished doing recovering
*/
Terms doneRecovering(String coreNodeName) {
if (!values.containsKey(recoveringTerm(coreNodeName))) {
return null;
}
HashMap<String, Long> newValues = new HashMap<>(values);
newValues.remove(recoveringTerm(coreNodeName));
return new Terms(newValues, version);
}
@Override
public String toString() {
return "Terms{" +
"values=" + values +
", version=" + version +
'}';
}
}
} }

View File

@ -32,6 +32,7 @@ import java.util.concurrent.atomic.AtomicInteger;
import java.util.function.Supplier; import java.util.function.Supplier;
import org.apache.solr.client.solrj.SolrServerException; import org.apache.solr.client.solrj.SolrServerException;
import org.apache.solr.client.solrj.cloud.ShardTerms;
import org.apache.solr.client.solrj.request.CollectionAdminRequest; import org.apache.solr.client.solrj.request.CollectionAdminRequest;
import org.apache.solr.common.util.TimeSource; import org.apache.solr.common.util.TimeSource;
import org.apache.solr.util.TimeOut; import org.apache.solr.util.TimeOut;
@ -267,7 +268,7 @@ public class ZkShardTermsTest extends SolrCloudTestCase {
public void testEnsureTermsIsHigher() { public void testEnsureTermsIsHigher() {
Map<String, Long> map = new HashMap<>(); Map<String, Long> map = new HashMap<>();
map.put("leader", 0L); map.put("leader", 0L);
ZkShardTerms.Terms terms = new ZkShardTerms.Terms(map, 0); ShardTerms terms = new ShardTerms(map, 0);
terms = terms.increaseTerms("leader", Collections.singleton("replica")); terms = terms.increaseTerms("leader", Collections.singleton("replica"));
assertEquals(1L, terms.getTerm("leader").longValue()); assertEquals(1L, terms.getTerm("leader").longValue());
} }

View File

@ -0,0 +1,256 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.solr.client.solrj.cloud;
import java.io.IOException;
import java.util.Collections;
import java.util.HashMap;
import java.util.Map;
import java.util.Objects;
import java.util.Set;
import org.apache.solr.common.MapWriter;
import org.apache.solr.common.SolrException;
/**
* Hold values of terms, this class is immutable. Create a new instance for every mutation
*/
public class ShardTerms implements MapWriter {
private static final String RECOVERING_TERM_SUFFIX = "_recovering";
private final Map<String, Long> values;
private final long maxTerm;
// ZK node version
private final int version;
public ShardTerms () {
this(new HashMap<>(), 0);
}
public ShardTerms(ShardTerms newTerms, int version) {
this(newTerms.values, version);
}
@Override
public void writeMap(EntryWriter ew) throws IOException {
values.forEach(ew.getBiConsumer());
}
public ShardTerms(Map<String, Long> values, int version) {
this.values = values;
this.version = version;
if (values.isEmpty()) this.maxTerm = 0;
else this.maxTerm = Collections.max(values.values());
}
/**
* Can {@code coreNodeName} become leader?
* @param coreNodeName of the replica
* @return true if {@code coreNodeName} can become leader, false if otherwise
*/
public boolean canBecomeLeader(String coreNodeName) {
return haveHighestTermValue(coreNodeName) && !values.containsKey(recoveringTerm(coreNodeName));
}
/**
* Is {@code coreNodeName}'s term highest?
* @param coreNodeName of the replica
* @return true if term of {@code coreNodeName} is highest
*/
public boolean haveHighestTermValue(String coreNodeName) {
if (values.isEmpty()) return true;
long maxTerm = Collections.max(values.values());
return values.getOrDefault(coreNodeName, 0L) == maxTerm;
}
public Long getTerm(String coreNodeName) {
return values.get(coreNodeName);
}
/**
* Return a new {@link ShardTerms} in which term of {@code leader} is higher than {@code replicasNeedingRecovery}
* @param leader coreNodeName of leader
* @param replicasNeedingRecovery set of replicas in which their terms should be lower than leader's term
* @return null if term of {@code leader} is already higher than {@code replicasNeedingRecovery}
*/
public ShardTerms increaseTerms(String leader, Set<String> replicasNeedingRecovery) {
if (!values.containsKey(leader)) {
throw new SolrException(SolrException.ErrorCode.SERVER_ERROR, "Can not find leader's term " + leader);
}
boolean changed = false;
boolean foundReplicasInLowerTerms = false;
HashMap<String, Long> newValues = new HashMap<>(values);
long leaderTerm = newValues.get(leader);
for (Map.Entry<String, Long> entry : newValues.entrySet()) {
String key = entry.getKey();
if (replicasNeedingRecovery.contains(key)) foundReplicasInLowerTerms = true;
if (Objects.equals(entry.getValue(), leaderTerm)) {
if(skipIncreaseTermOf(key, replicasNeedingRecovery)) {
changed = true;
} else {
newValues.put(key, leaderTerm+1);
}
}
}
// We should skip the optimization if there are no replicasNeedingRecovery present in local terms,
// this may indicate that the current value is stale
if (!changed && foundReplicasInLowerTerms) return null;
return new ShardTerms(newValues, version);
}
private boolean skipIncreaseTermOf(String key, Set<String> replicasNeedingRecovery) {
if (key.endsWith(RECOVERING_TERM_SUFFIX)) {
key = key.substring(0, key.length() - RECOVERING_TERM_SUFFIX.length());
}
return replicasNeedingRecovery.contains(key);
}
/**
* Return a new {@link ShardTerms} in which highest terms are not zero
* @return null if highest terms are already larger than zero
*/
public ShardTerms ensureHighestTermsAreNotZero() {
if (maxTerm > 0) return null;
else {
HashMap<String, Long> newValues = new HashMap<>(values);
for (String replica : values.keySet()) {
newValues.put(replica, 1L);
}
return new ShardTerms(newValues, version);
}
}
/**
* Return a new {@link ShardTerms} in which terms for the {@code coreNodeName} are removed
* @param coreNodeName of the replica
* @return null if term of {@code coreNodeName} is already not exist
*/
public ShardTerms removeTerm(String coreNodeName) {
if (!values.containsKey(recoveringTerm(coreNodeName)) && !values.containsKey(coreNodeName)) {
return null;
}
HashMap<String, Long> newValues = new HashMap<>(values);
newValues.remove(coreNodeName);
newValues.remove(recoveringTerm(coreNodeName));
return new ShardTerms(newValues, version);
}
/**
* Return a new {@link ShardTerms} in which the associate term of {@code coreNodeName} is not null
* @param coreNodeName of the replica
* @return null if term of {@code coreNodeName} is already exist
*/
public ShardTerms registerTerm(String coreNodeName) {
if (values.containsKey(coreNodeName)) return null;
HashMap<String, Long> newValues = new HashMap<>(values);
newValues.put(coreNodeName, 0L);
return new ShardTerms(newValues, version);
}
public ShardTerms setTermToZero(String coreNodeName) {
if (values.getOrDefault(coreNodeName, -1L) == 0) {
return null;
}
HashMap<String, Long> newValues = new HashMap<>(values);
newValues.put(coreNodeName, 0L);
return new ShardTerms(newValues, version);
}
/**
* Return a new {@link ShardTerms} in which the term of {@code coreNodeName} is max
* @param coreNodeName of the replica
* @return null if term of {@code coreNodeName} is already maximum
*/
public ShardTerms setTermEqualsToLeader(String coreNodeName) {
long maxTerm = getMaxTerm();
if (values.get(coreNodeName) == maxTerm) return null;
HashMap<String, Long> newValues = new HashMap<>(values);
newValues.put(coreNodeName, maxTerm);
newValues.remove(recoveringTerm(coreNodeName));
return new ShardTerms(newValues, version);
}
public long getMaxTerm() {
return maxTerm;
}
/**
* Mark {@code coreNodeName} as recovering
* @param coreNodeName of the replica
* @return null if {@code coreNodeName} is already marked as doing recovering
*/
public ShardTerms startRecovering(String coreNodeName) {
long maxTerm = getMaxTerm();
if (values.get(coreNodeName) == maxTerm)
return null;
HashMap<String, Long> newValues = new HashMap<>(values);
if (!newValues.containsKey(recoveringTerm(coreNodeName))) {
long currentTerm = newValues.getOrDefault(coreNodeName, 0L);
// by keeping old term, we will have more information in leader election
newValues.put(recoveringTerm(coreNodeName), currentTerm);
}
newValues.put(coreNodeName, maxTerm);
return new ShardTerms(newValues, version);
}
/**
* Mark {@code coreNodeName} as finished recovering
* @param coreNodeName of the replica
* @return null if term of {@code coreNodeName} is already finished doing recovering
*/
public ShardTerms doneRecovering(String coreNodeName) {
if (!values.containsKey(recoveringTerm(coreNodeName))) {
return null;
}
HashMap<String, Long> newValues = new HashMap<>(values);
newValues.remove(recoveringTerm(coreNodeName));
return new ShardTerms(newValues, version);
}
public static String recoveringTerm(String coreNodeName) {
return coreNodeName + RECOVERING_TERM_SUFFIX;
}
@Override
public String toString() {
return "Terms{" +
"values=" + values +
", version=" + version +
'}';
}
public int getVersion() {
return version;
}
public Map<String , Long> getTerms() {
return new HashMap<>(this.values);
}
public boolean isRecovering(String name) {
return values.containsKey(recoveringTerm(name));
}
}