mirror of https://github.com/apache/lucene.git
SOLR-11548: make ZkOperation an interface
This commit is contained in:
parent
f47523d05b
commit
a72330e9b9
|
@ -21,10 +21,8 @@ import java.util.concurrent.TimeUnit;
|
|||
import java.util.concurrent.atomic.AtomicInteger;
|
||||
|
||||
import junit.framework.Assert;
|
||||
|
||||
import org.apache.solr.common.cloud.SolrZkClient;
|
||||
import org.apache.solr.common.cloud.ZkCmdExecutor;
|
||||
import org.apache.solr.common.cloud.ZkOperation;
|
||||
import org.apache.solr.util.AbstractSolrTestCase;
|
||||
import org.apache.zookeeper.CreateMode;
|
||||
import org.apache.zookeeper.KeeperException;
|
||||
|
@ -205,14 +203,11 @@ public class ZkSolrClientTest extends AbstractSolrTestCase {
|
|||
ZkCmdExecutor zkCmdExecutor = new ZkCmdExecutor(timeout);
|
||||
final long start = System.nanoTime();
|
||||
try {
|
||||
zkCmdExecutor.retryOperation(new ZkOperation() {
|
||||
@Override
|
||||
public String execute() throws KeeperException, InterruptedException {
|
||||
if (System.nanoTime() - start > TimeUnit.NANOSECONDS.convert(timeout, TimeUnit.MILLISECONDS)) {
|
||||
throw new KeeperException.SessionExpiredException();
|
||||
}
|
||||
throw new KeeperException.ConnectionLossException();
|
||||
zkCmdExecutor.retryOperation(() -> {
|
||||
if (System.nanoTime() - start > TimeUnit.NANOSECONDS.convert(timeout, TimeUnit.MILLISECONDS)) {
|
||||
throw new KeeperException.SessionExpiredException();
|
||||
}
|
||||
throw new KeeperException.ConnectionLossException();
|
||||
});
|
||||
} catch(KeeperException.SessionExpiredException e) {
|
||||
|
||||
|
|
|
@ -40,7 +40,6 @@ import org.apache.commons.io.FileUtils;
|
|||
import org.apache.solr.client.solrj.SolrServerException;
|
||||
import org.apache.solr.common.SolrException;
|
||||
import org.apache.solr.common.StringUtils;
|
||||
import org.apache.solr.common.cloud.ZkClientConnectionStrategy.ZkUpdate;
|
||||
import org.apache.solr.common.util.ExecutorUtil;
|
||||
import org.apache.solr.common.util.ObjectReleaseTracker;
|
||||
import org.apache.solr.common.util.SolrjNamedThreadFactory;
|
||||
|
@ -144,18 +143,15 @@ public class SolrZkClient implements Closeable {
|
|||
|
||||
try {
|
||||
strat.connect(zkServerAddress, zkClientTimeout, wrapWatcher(connManager),
|
||||
new ZkUpdate() {
|
||||
@Override
|
||||
public void update(SolrZooKeeper zooKeeper) {
|
||||
SolrZooKeeper oldKeeper = keeper;
|
||||
keeper = zooKeeper;
|
||||
try {
|
||||
closeKeeper(oldKeeper);
|
||||
} finally {
|
||||
if (isClosed) {
|
||||
// we may have been closed
|
||||
closeKeeper(SolrZkClient.this.keeper);
|
||||
}
|
||||
zooKeeper -> {
|
||||
SolrZooKeeper oldKeeper = keeper;
|
||||
keeper = zooKeeper;
|
||||
try {
|
||||
closeKeeper(oldKeeper);
|
||||
} finally {
|
||||
if (isClosed) {
|
||||
// we may have been closed
|
||||
closeKeeper(SolrZkClient.this.keeper);
|
||||
}
|
||||
}
|
||||
});
|
||||
|
@ -240,12 +236,9 @@ public class SolrZkClient implements Closeable {
|
|||
public void delete(final String path, final int version, boolean retryOnConnLoss)
|
||||
throws InterruptedException, KeeperException {
|
||||
if (retryOnConnLoss) {
|
||||
zkCmdExecutor.retryOperation(new ZkOperation() {
|
||||
@Override
|
||||
public Stat execute() throws KeeperException, InterruptedException {
|
||||
keeper.delete(path, version);
|
||||
return null;
|
||||
}
|
||||
zkCmdExecutor.retryOperation(() -> {
|
||||
keeper.delete(path, version);
|
||||
return null;
|
||||
});
|
||||
} else {
|
||||
keeper.delete(path, version);
|
||||
|
@ -300,12 +293,7 @@ public class SolrZkClient implements Closeable {
|
|||
public Stat exists(final String path, final Watcher watcher, boolean retryOnConnLoss)
|
||||
throws KeeperException, InterruptedException {
|
||||
if (retryOnConnLoss) {
|
||||
return zkCmdExecutor.retryOperation(new ZkOperation() {
|
||||
@Override
|
||||
public Stat execute() throws KeeperException, InterruptedException {
|
||||
return keeper.exists(path, wrapWatcher(watcher));
|
||||
}
|
||||
});
|
||||
return zkCmdExecutor.retryOperation(() -> keeper.exists(path, wrapWatcher(watcher)));
|
||||
} else {
|
||||
return keeper.exists(path, wrapWatcher(watcher));
|
||||
}
|
||||
|
@ -317,12 +305,7 @@ public class SolrZkClient implements Closeable {
|
|||
public Boolean exists(final String path, boolean retryOnConnLoss)
|
||||
throws KeeperException, InterruptedException {
|
||||
if (retryOnConnLoss) {
|
||||
return zkCmdExecutor.retryOperation(new ZkOperation() {
|
||||
@Override
|
||||
public Boolean execute() throws KeeperException, InterruptedException {
|
||||
return keeper.exists(path, null) != null;
|
||||
}
|
||||
});
|
||||
return zkCmdExecutor.retryOperation(() -> keeper.exists(path, null) != null);
|
||||
} else {
|
||||
return keeper.exists(path, null) != null;
|
||||
}
|
||||
|
@ -334,12 +317,7 @@ public class SolrZkClient implements Closeable {
|
|||
public List<String> getChildren(final String path, final Watcher watcher, boolean retryOnConnLoss)
|
||||
throws KeeperException, InterruptedException {
|
||||
if (retryOnConnLoss) {
|
||||
return zkCmdExecutor.retryOperation(new ZkOperation() {
|
||||
@Override
|
||||
public List<String> execute() throws KeeperException, InterruptedException {
|
||||
return keeper.getChildren(path, wrapWatcher(watcher));
|
||||
}
|
||||
});
|
||||
return zkCmdExecutor.retryOperation(() -> keeper.getChildren(path, wrapWatcher(watcher)));
|
||||
} else {
|
||||
return keeper.getChildren(path, wrapWatcher(watcher));
|
||||
}
|
||||
|
@ -351,12 +329,7 @@ public class SolrZkClient implements Closeable {
|
|||
public byte[] getData(final String path, final Watcher watcher, final Stat stat, boolean retryOnConnLoss)
|
||||
throws KeeperException, InterruptedException {
|
||||
if (retryOnConnLoss) {
|
||||
return zkCmdExecutor.retryOperation(new ZkOperation() {
|
||||
@Override
|
||||
public byte[] execute() throws KeeperException, InterruptedException {
|
||||
return keeper.getData(path, wrapWatcher(watcher), stat);
|
||||
}
|
||||
});
|
||||
return zkCmdExecutor.retryOperation(() -> keeper.getData(path, wrapWatcher(watcher), stat));
|
||||
} else {
|
||||
return keeper.getData(path, wrapWatcher(watcher), stat);
|
||||
}
|
||||
|
@ -368,12 +341,7 @@ public class SolrZkClient implements Closeable {
|
|||
public Stat setData(final String path, final byte data[], final int version, boolean retryOnConnLoss)
|
||||
throws KeeperException, InterruptedException {
|
||||
if (retryOnConnLoss) {
|
||||
return zkCmdExecutor.retryOperation(new ZkOperation() {
|
||||
@Override
|
||||
public Stat execute() throws KeeperException, InterruptedException {
|
||||
return keeper.setData(path, data, version);
|
||||
}
|
||||
});
|
||||
return zkCmdExecutor.retryOperation(() -> keeper.setData(path, data, version));
|
||||
} else {
|
||||
return keeper.setData(path, data, version);
|
||||
}
|
||||
|
@ -386,13 +354,8 @@ public class SolrZkClient implements Closeable {
|
|||
final CreateMode createMode, boolean retryOnConnLoss) throws KeeperException,
|
||||
InterruptedException {
|
||||
if (retryOnConnLoss) {
|
||||
return zkCmdExecutor.retryOperation(new ZkOperation() {
|
||||
@Override
|
||||
public String execute() throws KeeperException, InterruptedException {
|
||||
return keeper.create(path, data, zkACLProvider.getACLsToAdd(path),
|
||||
createMode);
|
||||
}
|
||||
});
|
||||
return zkCmdExecutor.retryOperation(() -> keeper.create(path, data, zkACLProvider.getACLsToAdd(path),
|
||||
createMode));
|
||||
} else {
|
||||
List<ACL> acls = zkACLProvider.getACLsToAdd(path);
|
||||
return keeper.create(path, data, acls, createMode);
|
||||
|
@ -521,12 +484,9 @@ public class SolrZkClient implements Closeable {
|
|||
if (retry) {
|
||||
final CreateMode finalMode = mode;
|
||||
final byte[] finalBytes = bytes;
|
||||
zkCmdExecutor.retryOperation(new ZkOperation() {
|
||||
@Override
|
||||
public Object execute() throws KeeperException, InterruptedException {
|
||||
keeper.create(currentPath, finalBytes, zkACLProvider.getACLsToAdd(currentPath), finalMode);
|
||||
return null;
|
||||
}
|
||||
zkCmdExecutor.retryOperation(() -> {
|
||||
keeper.create(currentPath, finalBytes, zkACLProvider.getACLsToAdd(currentPath), finalMode);
|
||||
return null;
|
||||
});
|
||||
} else {
|
||||
keeper.create(currentPath, bytes, zkACLProvider.getACLsToAdd(currentPath), mode);
|
||||
|
@ -587,12 +547,7 @@ public class SolrZkClient implements Closeable {
|
|||
|
||||
public List<OpResult> multi(final Iterable<Op> ops, boolean retryOnConnLoss) throws InterruptedException, KeeperException {
|
||||
if (retryOnConnLoss) {
|
||||
return zkCmdExecutor.retryOperation(new ZkOperation() {
|
||||
@Override
|
||||
public List<OpResult> execute() throws KeeperException, InterruptedException {
|
||||
return keeper.multi(ops);
|
||||
}
|
||||
});
|
||||
return zkCmdExecutor.retryOperation(() -> keeper.multi(ops));
|
||||
} else {
|
||||
return keeper.multi(ops);
|
||||
}
|
||||
|
@ -766,12 +721,7 @@ public class SolrZkClient implements Closeable {
|
|||
*/
|
||||
public Stat setACL(String path, List<ACL> acls, boolean retryOnConnLoss) throws InterruptedException, KeeperException {
|
||||
if (retryOnConnLoss) {
|
||||
return zkCmdExecutor.retryOperation(new ZkOperation() {
|
||||
@Override
|
||||
public Stat execute() throws KeeperException, InterruptedException {
|
||||
return keeper.setACL(path, acls, -1);
|
||||
}
|
||||
});
|
||||
return zkCmdExecutor.retryOperation(() -> keeper.setACL(path, acls, -1));
|
||||
} else {
|
||||
return keeper.setACL(path, acls, -1);
|
||||
}
|
||||
|
|
|
@ -68,12 +68,12 @@ public abstract class ZkClientConnectionStrategy {
|
|||
}
|
||||
|
||||
public interface DisconnectedListener {
|
||||
public void disconnected();
|
||||
};
|
||||
void disconnected();
|
||||
}
|
||||
|
||||
public interface ConnectedListener {
|
||||
public void connected();
|
||||
};
|
||||
void connected();
|
||||
}
|
||||
|
||||
|
||||
public synchronized void addDisconnectedListener(DisconnectedListener listener) {
|
||||
|
@ -83,9 +83,9 @@ public abstract class ZkClientConnectionStrategy {
|
|||
public synchronized void addConnectedListener(ConnectedListener listener) {
|
||||
connectedListeners.add(listener);
|
||||
}
|
||||
|
||||
public static abstract class ZkUpdate {
|
||||
public abstract void update(SolrZooKeeper zooKeeper) throws InterruptedException, TimeoutException, IOException;
|
||||
|
||||
public interface ZkUpdate {
|
||||
void update(SolrZooKeeper zooKeeper) throws InterruptedException, TimeoutException, IOException;
|
||||
}
|
||||
|
||||
public void setZkCredentialsToAddAutomatically(ZkCredentialsProvider zkCredentialsToAddAutomatically) {
|
||||
|
|
|
@ -21,7 +21,7 @@ import org.apache.zookeeper.KeeperException;
|
|||
/**
|
||||
* A callback object which can be used for implementing retry-able operations.
|
||||
*/
|
||||
public abstract class ZkOperation {
|
||||
public interface ZkOperation {
|
||||
|
||||
/**
|
||||
* Performs the operation - which may be involved multiple times if the connection
|
||||
|
@ -29,5 +29,5 @@ public abstract class ZkOperation {
|
|||
*
|
||||
* @return the result of the operation or null
|
||||
*/
|
||||
public abstract Object execute() throws KeeperException, InterruptedException;
|
||||
Object execute() throws KeeperException, InterruptedException;
|
||||
}
|
||||
|
|
Loading…
Reference in New Issue