SOLR-14474: Fix remaining auxilliary class warnings in Solr

This commit is contained in:
Erick Erickson 2020-05-27 12:06:29 -04:00
parent ac80fb9979
commit 07a9b5d1b0
26 changed files with 1651 additions and 1538 deletions

View File

@ -263,6 +263,8 @@ Other Changes
* SOLR-14280: Improve error reporting in SolrConfig (Andras Salamon via Jason Gerlowski) * SOLR-14280: Improve error reporting in SolrConfig (Andras Salamon via Jason Gerlowski)
* SOLR-14474: Fix remaining auxilliary class warnings in Solr (Erick Erickson)
================== 8.5.1 ================== ================== 8.5.1 ==================
Consult the LUCENE_CHANGES.txt file for additional, low level, changes in this release. Consult the LUCENE_CHANGES.txt file for additional, low level, changes in this release.

View File

@ -19,50 +19,13 @@ package org.apache.solr.cloud;
import java.io.Closeable; import java.io.Closeable;
import java.io.IOException; import java.io.IOException;
import java.lang.invoke.MethodHandles; import java.lang.invoke.MethodHandles;
import java.util.ArrayList;
import java.util.EnumSet;
import java.util.List;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
import org.apache.hadoop.fs.Path;
import org.apache.lucene.search.MatchAllDocsQuery;
import org.apache.solr.cloud.overseer.OverseerAction;
import org.apache.solr.common.SolrException;
import org.apache.solr.common.SolrException.ErrorCode;
import org.apache.solr.common.cloud.ClusterState;
import org.apache.solr.common.cloud.DocCollection;
import org.apache.solr.common.cloud.Replica;
import org.apache.solr.common.cloud.Slice;
import org.apache.solr.common.cloud.SolrZkClient; import org.apache.solr.common.cloud.SolrZkClient;
import org.apache.solr.common.cloud.ZkCmdExecutor;
import org.apache.solr.common.cloud.ZkCoreNodeProps;
import org.apache.solr.common.cloud.ZkNodeProps; import org.apache.solr.common.cloud.ZkNodeProps;
import org.apache.solr.common.cloud.ZkStateReader;
import org.apache.solr.common.util.RetryUtil;
import org.apache.solr.common.util.Utils;
import org.apache.solr.core.CoreContainer;
import org.apache.solr.core.SolrCore;
import org.apache.solr.logging.MDCLoggingContext;
import org.apache.solr.search.SolrIndexSearcher;
import org.apache.solr.update.PeerSync;
import org.apache.solr.update.UpdateLog;
import org.apache.solr.util.RefCounted;
import org.apache.zookeeper.CreateMode;
import org.apache.zookeeper.KeeperException; import org.apache.zookeeper.KeeperException;
import org.apache.zookeeper.KeeperException.NoNodeException; import org.apache.zookeeper.KeeperException.NoNodeException;
import org.apache.zookeeper.KeeperException.NodeExistsException;
import org.apache.zookeeper.KeeperException.SessionExpiredException;
import org.apache.zookeeper.Op;
import org.apache.zookeeper.OpResult;
import org.apache.zookeeper.OpResult.SetDataResult;
import org.apache.zookeeper.ZooDefs;
import org.apache.zookeeper.data.Stat;
import org.slf4j.Logger; import org.slf4j.Logger;
import org.slf4j.LoggerFactory; import org.slf4j.LoggerFactory;
import static org.apache.solr.common.params.CommonParams.ID;
public abstract class ElectionContext implements Closeable { public abstract class ElectionContext implements Closeable {
private static final Logger log = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass()); private static final Logger log = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
final String electionPath; final String electionPath;
@ -111,676 +74,4 @@ public abstract class ElectionContext implements Closeable {
} }
} }
class ShardLeaderElectionContextBase extends ElectionContext {
private static final Logger log = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
protected final SolrZkClient zkClient;
protected String shardId;
protected String collection;
protected LeaderElector leaderElector;
protected ZkStateReader zkStateReader;
protected ZkController zkController;
private Integer leaderZkNodeParentVersion;
// Prevents a race between cancelling and becoming leader.
private final Object lock = new Object();
public ShardLeaderElectionContextBase(LeaderElector leaderElector,
final String shardId, final String collection, final String coreNodeName,
ZkNodeProps props, ZkController zkController) {
super(coreNodeName, ZkStateReader.COLLECTIONS_ZKNODE + "/" + collection
+ "/leader_elect/" + shardId, ZkStateReader.getShardLeadersPath(
collection, shardId), props, zkController.getZkClient());
this.leaderElector = leaderElector;
this.zkStateReader = zkController.getZkStateReader();
this.zkClient = zkStateReader.getZkClient();
this.zkController = zkController;
this.shardId = shardId;
this.collection = collection;
String parent = new Path(leaderPath).getParent().toString();
ZkCmdExecutor zcmd = new ZkCmdExecutor(30000);
// only if /collections/{collection} exists already do we succeed in creating this path
log.info("make sure parent is created {}", parent);
try {
zcmd.ensureExists(parent, (byte[])null, CreateMode.PERSISTENT, zkClient, 2);
} catch (KeeperException e) {
throw new RuntimeException(e);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
throw new RuntimeException(e);
}
}
@Override
public void cancelElection() throws InterruptedException, KeeperException {
super.cancelElection();
synchronized (lock) {
if (leaderZkNodeParentVersion != null) {
try {
// We need to be careful and make sure we *only* delete our own leader registration node.
// We do this by using a multi and ensuring the parent znode of the leader registration node
// matches the version we expect - there is a setData call that increments the parent's znode
// version whenever a leader registers.
log.debug("Removing leader registration node on cancel: {} {}", leaderPath, leaderZkNodeParentVersion);
List<Op> ops = new ArrayList<>(2);
ops.add(Op.check(new Path(leaderPath).getParent().toString(), leaderZkNodeParentVersion));
ops.add(Op.delete(leaderPath, -1));
zkClient.multi(ops, true);
} catch (KeeperException.NoNodeException nne) {
// no problem
log.debug("No leader registration node found to remove: {}", leaderPath);
} catch (KeeperException.BadVersionException bve) {
log.info("Cannot remove leader registration node because the current registered node is not ours: {}", leaderPath);
// no problem
} catch (InterruptedException e) {
throw e;
} catch (Exception e) {
SolrException.log(log, e);
}
leaderZkNodeParentVersion = null;
} else {
log.info("No version found for ephemeral leader parent node, won't remove previous leader registration.");
}
}
}
@Override
void runLeaderProcess(boolean weAreReplacement, int pauseBeforeStartMs)
throws KeeperException, InterruptedException, IOException {
// register as leader - if an ephemeral is already there, wait to see if it goes away
String parent = new Path(leaderPath).getParent().toString();
try {
RetryUtil.retryOnThrowable(NodeExistsException.class, 60000, 5000, () -> {
synchronized (lock) {
log.info("Creating leader registration node {} after winning as {}", leaderPath, leaderSeqPath);
List<Op> ops = new ArrayList<>(2);
// We use a multi operation to get the parent nodes version, which will
// be used to make sure we only remove our own leader registration node.
// The setData call used to get the parent version is also the trigger to
// increment the version. We also do a sanity check that our leaderSeqPath exists.
ops.add(Op.check(leaderSeqPath, -1));
ops.add(Op.create(leaderPath, Utils.toJSON(leaderProps), zkClient.getZkACLProvider().getACLsToAdd(leaderPath), CreateMode.EPHEMERAL));
ops.add(Op.setData(parent, null, -1));
List<OpResult> results;
results = zkClient.multi(ops, true);
for (OpResult result : results) {
if (result.getType() == ZooDefs.OpCode.setData) {
SetDataResult dresult = (SetDataResult) result;
Stat stat = dresult.getStat();
leaderZkNodeParentVersion = stat.getVersion();
return;
}
}
assert leaderZkNodeParentVersion != null;
}
});
} catch (NoNodeException e) {
log.info("Will not register as leader because it seems the election is no longer taking place.");
return;
} catch (Throwable t) {
if (t instanceof OutOfMemoryError) {
throw (OutOfMemoryError) t;
}
throw new SolrException(ErrorCode.SERVER_ERROR, "Could not register as the leader because creating the ephemeral registration node in ZooKeeper failed", t);
}
assert shardId != null;
boolean isAlreadyLeader = false;
if (zkStateReader.getClusterState() != null &&
zkStateReader.getClusterState().getCollection(collection).getSlice(shardId).getReplicas().size() < 2) {
Replica leader = zkStateReader.getLeader(collection, shardId);
if (leader != null
&& leader.getBaseUrl().equals(leaderProps.get(ZkStateReader.BASE_URL_PROP))
&& leader.getCoreName().equals(leaderProps.get(ZkStateReader.CORE_NAME_PROP))) {
isAlreadyLeader = true;
}
}
if (!isAlreadyLeader) {
ZkNodeProps m = ZkNodeProps.fromKeyVals(Overseer.QUEUE_OPERATION, OverseerAction.LEADER.toLower(),
ZkStateReader.SHARD_ID_PROP, shardId,
ZkStateReader.COLLECTION_PROP, collection,
ZkStateReader.BASE_URL_PROP, leaderProps.get(ZkStateReader.BASE_URL_PROP),
ZkStateReader.CORE_NAME_PROP, leaderProps.get(ZkStateReader.CORE_NAME_PROP),
ZkStateReader.STATE_PROP, Replica.State.ACTIVE.toString());
assert zkController != null;
assert zkController.getOverseer() != null;
zkController.getOverseer().offerStateUpdate(Utils.toJSON(m));
}
}
public LeaderElector getLeaderElector() {
return leaderElector;
}
Integer getLeaderZkNodeParentVersion() {
synchronized (lock) {
return leaderZkNodeParentVersion;
}
}
}
// add core container and stop passing core around...
final class ShardLeaderElectionContext extends ShardLeaderElectionContextBase {
private static final Logger log = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
private final CoreContainer cc;
private final SyncStrategy syncStrategy;
private volatile boolean isClosed = false;
public ShardLeaderElectionContext(LeaderElector leaderElector,
final String shardId, final String collection,
final String coreNodeName, ZkNodeProps props, ZkController zkController, CoreContainer cc) {
super(leaderElector, shardId, collection, coreNodeName, props,
zkController);
this.cc = cc;
syncStrategy = new SyncStrategy(cc);
}
@Override
public void close() {
super.close();
this.isClosed = true;
syncStrategy.close();
}
@Override
public void cancelElection() throws InterruptedException, KeeperException {
String coreName = leaderProps.getStr(ZkStateReader.CORE_NAME_PROP);
try (SolrCore core = cc.getCore(coreName)) {
if (core != null) {
core.getCoreDescriptor().getCloudDescriptor().setLeader(false);
}
}
super.cancelElection();
}
@Override
public ElectionContext copy() {
return new ShardLeaderElectionContext(leaderElector, shardId, collection, id, leaderProps, zkController, cc);
}
/*
* weAreReplacement: has someone else been the leader already?
*/
@Override
void runLeaderProcess(boolean weAreReplacement, int pauseBeforeStart) throws KeeperException,
InterruptedException, IOException {
String coreName = leaderProps.getStr(ZkStateReader.CORE_NAME_PROP);
ActionThrottle lt;
try (SolrCore core = cc.getCore(coreName)) {
if (core == null ) {
// shutdown or removed
return;
}
MDCLoggingContext.setCore(core);
lt = core.getUpdateHandler().getSolrCoreState().getLeaderThrottle();
}
try {
lt.minimumWaitBetweenActions();
lt.markAttemptingAction();
int leaderVoteWait = cc.getZkController().getLeaderVoteWait();
log.debug("Running the leader process for shard={} and weAreReplacement={} and leaderVoteWait={}", shardId, weAreReplacement, leaderVoteWait);
if (zkController.getClusterState().getCollection(collection).getSlice(shardId).getReplicas().size() > 1) {
// Clear the leader in clusterstate. We only need to worry about this if there is actually more than one replica.
ZkNodeProps m = new ZkNodeProps(Overseer.QUEUE_OPERATION, OverseerAction.LEADER.toLower(),
ZkStateReader.SHARD_ID_PROP, shardId, ZkStateReader.COLLECTION_PROP, collection);
zkController.getOverseer().getStateUpdateQueue().offer(Utils.toJSON(m));
}
boolean allReplicasInLine = false;
if (!weAreReplacement) {
allReplicasInLine = waitForReplicasToComeUp(leaderVoteWait);
} else {
allReplicasInLine = areAllReplicasParticipating();
}
if (isClosed) {
// Solr is shutting down or the ZooKeeper session expired while waiting for replicas. If the later,
// we cannot be sure we are still the leader, so we should bail out. The OnReconnect handler will
// re-register the cores and handle a new leadership election.
return;
}
Replica.Type replicaType;
String coreNodeName;
boolean setTermToMax = false;
try (SolrCore core = cc.getCore(coreName)) {
if (core == null) {
return;
}
replicaType = core.getCoreDescriptor().getCloudDescriptor().getReplicaType();
coreNodeName = core.getCoreDescriptor().getCloudDescriptor().getCoreNodeName();
// should I be leader?
ZkShardTerms zkShardTerms = zkController.getShardTerms(collection, shardId);
if (zkShardTerms.registered(coreNodeName) && !zkShardTerms.canBecomeLeader(coreNodeName)) {
if (!waitForEligibleBecomeLeaderAfterTimeout(zkShardTerms, coreNodeName, leaderVoteWait)) {
rejoinLeaderElection(core);
return;
} else {
// only log an error if this replica win the election
setTermToMax = true;
}
}
if (isClosed) {
return;
}
log.info("I may be the new leader - try and sync");
// we are going to attempt to be the leader
// first cancel any current recovery
core.getUpdateHandler().getSolrCoreState().cancelRecovery();
if (weAreReplacement) {
// wait a moment for any floating updates to finish
try {
Thread.sleep(2500);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
throw new SolrException(ErrorCode.SERVICE_UNAVAILABLE, e);
}
}
PeerSync.PeerSyncResult result = null;
boolean success = false;
try {
result = syncStrategy.sync(zkController, core, leaderProps, weAreReplacement);
success = result.isSuccess();
} catch (Exception e) {
SolrException.log(log, "Exception while trying to sync", e);
result = PeerSync.PeerSyncResult.failure();
}
UpdateLog ulog = core.getUpdateHandler().getUpdateLog();
if (!success) {
boolean hasRecentUpdates = false;
if (ulog != null) {
// TODO: we could optimize this if necessary
try (UpdateLog.RecentUpdates recentUpdates = ulog.getRecentUpdates()) {
hasRecentUpdates = !recentUpdates.getVersions(1).isEmpty();
}
}
if (!hasRecentUpdates) {
// we failed sync, but we have no versions - we can't sync in that case
// - we were active
// before, so become leader anyway if no one else has any versions either
if (result.getOtherHasVersions().orElse(false)) {
log.info("We failed sync, but we have no versions - we can't sync in that case. But others have some versions, so we should not become leader");
success = false;
} else {
log.info(
"We failed sync, but we have no versions - we can't sync in that case - we were active before, so become leader anyway");
success = true;
}
}
}
// solrcloud_debug
if (log.isDebugEnabled()) {
try {
RefCounted<SolrIndexSearcher> searchHolder = core.getNewestSearcher(false);
SolrIndexSearcher searcher = searchHolder.get();
try {
if (log.isDebugEnabled()) {
log.debug("{} synched {}", core.getCoreContainer().getZkController().getNodeName()
, searcher.count(new MatchAllDocsQuery()));
}
} finally {
searchHolder.decref();
}
} catch (Exception e) {
log.error("Error in solrcloud_debug block", e);
}
}
if (!success) {
rejoinLeaderElection(core);
return;
}
}
boolean isLeader = true;
if (!isClosed) {
try {
if (replicaType == Replica.Type.TLOG) {
// stop replicate from old leader
zkController.stopReplicationFromLeader(coreName);
if (weAreReplacement) {
try (SolrCore core = cc.getCore(coreName)) {
Future<UpdateLog.RecoveryInfo> future = core.getUpdateHandler().getUpdateLog().recoverFromCurrentLog();
if (future != null) {
log.info("Replaying tlog before become new leader");
future.get();
} else {
log.info("New leader does not have old tlog to replay");
}
}
}
}
// in case of leaderVoteWait timeout, a replica with lower term can win the election
if (setTermToMax) {
log.error("WARNING: Potential data loss -- Replica {} became leader after timeout (leaderVoteWait) {}"
, "without being up-to-date with the previous leader", coreNodeName);
zkController.getShardTerms(collection, shardId).setTermEqualsToLeader(coreNodeName);
}
super.runLeaderProcess(weAreReplacement, 0);
try (SolrCore core = cc.getCore(coreName)) {
if (core != null) {
core.getCoreDescriptor().getCloudDescriptor().setLeader(true);
publishActiveIfRegisteredAndNotActive(core);
} else {
return;
}
}
if (log.isInfoEnabled()) {
log.info("I am the new leader: {} {}", ZkCoreNodeProps.getCoreUrl(leaderProps), shardId);
}
// we made it as leader - send any recovery requests we need to
syncStrategy.requestRecoveries();
} catch (SessionExpiredException e) {
throw new SolrException(ErrorCode.SERVER_ERROR,
"ZK session expired - cancelling election for " + collection + " " + shardId);
} catch (Exception e) {
isLeader = false;
SolrException.log(log, "There was a problem trying to register as the leader", e);
try (SolrCore core = cc.getCore(coreName)) {
if (core == null) {
if (log.isDebugEnabled()) {
log.debug("SolrCore not found: {} in {}", coreName, cc.getLoadedCoreNames());
}
return;
}
core.getCoreDescriptor().getCloudDescriptor().setLeader(false);
// we could not publish ourselves as leader - try and rejoin election
try {
rejoinLeaderElection(core);
} catch (SessionExpiredException exc) {
throw new SolrException(ErrorCode.SERVER_ERROR,
"ZK session expired - cancelling election for " + collection + " " + shardId);
}
}
}
} else {
cancelElection();
}
} finally {
MDCLoggingContext.clear();
}
}
/**
* Wait for other replicas with higher terms participate in the electioon
* @return true if after {@code timeout} there are no other replicas with higher term participate in the election,
* false if otherwise
*/
private boolean waitForEligibleBecomeLeaderAfterTimeout(ZkShardTerms zkShardTerms, String coreNodeName, int timeout) throws InterruptedException {
long timeoutAt = System.nanoTime() + TimeUnit.NANOSECONDS.convert(timeout, TimeUnit.MILLISECONDS);
while (!isClosed && !cc.isShutDown()) {
if (System.nanoTime() > timeoutAt) {
log.warn("After waiting for {}ms, no other potential leader was found, {} try to become leader anyway (core_term:{}, highest_term:{})",
timeout, coreNodeName, zkShardTerms.getTerm(coreNodeName), zkShardTerms.getHighestTerm());
return true;
}
if (replicasWithHigherTermParticipated(zkShardTerms, coreNodeName)) {
log.info("Can't become leader, other replicas with higher term participated in leader election");
return false;
}
Thread.sleep(500L);
}
return false;
}
/**
* Do other replicas with higher term participated in the election
* @return true if other replicas with higher term participated in the election, false if otherwise
*/
private boolean replicasWithHigherTermParticipated(ZkShardTerms zkShardTerms, String coreNodeName) {
ClusterState clusterState = zkController.getClusterState();
DocCollection docCollection = clusterState.getCollectionOrNull(collection);
Slice slices = (docCollection == null) ? null : docCollection.getSlice(shardId);
if (slices == null) return false;
long replicaTerm = zkShardTerms.getTerm(coreNodeName);
boolean isRecovering = zkShardTerms.isRecovering(coreNodeName);
for (Replica replica : slices.getReplicas()) {
if (replica.getName().equals(coreNodeName)) continue;
if (clusterState.getLiveNodes().contains(replica.getNodeName())) {
long otherTerm = zkShardTerms.getTerm(replica.getName());
boolean isOtherReplicaRecovering = zkShardTerms.isRecovering(replica.getName());
if (isRecovering && !isOtherReplicaRecovering) return true;
if (otherTerm > replicaTerm) return true;
}
}
return false;
}
public void publishActiveIfRegisteredAndNotActive(SolrCore core) throws Exception {
if (core.getCoreDescriptor().getCloudDescriptor().hasRegistered()) {
ZkStateReader zkStateReader = zkController.getZkStateReader();
zkStateReader.forceUpdateCollection(collection);
ClusterState clusterState = zkStateReader.getClusterState();
Replica rep = getReplica(clusterState, collection, leaderProps.getStr(ZkStateReader.CORE_NODE_NAME_PROP));
if (rep == null) return;
if (rep.getState() != Replica.State.ACTIVE || core.getCoreDescriptor().getCloudDescriptor().getLastPublished() != Replica.State.ACTIVE) {
log.debug("We have become the leader after core registration but are not in an ACTIVE state - publishing ACTIVE");
zkController.publish(core.getCoreDescriptor(), Replica.State.ACTIVE);
}
}
}
private Replica getReplica(ClusterState clusterState, String collectionName, String replicaName) {
if (clusterState == null) return null;
final DocCollection docCollection = clusterState.getCollectionOrNull(collectionName);
if (docCollection == null) return null;
return docCollection.getReplica(replicaName);
}
// returns true if all replicas are found to be up, false if not
private boolean waitForReplicasToComeUp(int timeoutms) throws InterruptedException {
long timeoutAt = System.nanoTime() + TimeUnit.NANOSECONDS.convert(timeoutms, TimeUnit.MILLISECONDS);
final String shardsElectZkPath = electionPath + LeaderElector.ELECTION_NODE;
DocCollection docCollection = zkController.getClusterState().getCollectionOrNull(collection);
Slice slices = (docCollection == null) ? null : docCollection.getSlice(shardId);
int cnt = 0;
while (!isClosed && !cc.isShutDown()) {
// wait for everyone to be up
if (slices != null) {
int found = 0;
try {
found = zkClient.getChildren(shardsElectZkPath, null, true).size();
} catch (KeeperException e) {
if (e instanceof KeeperException.SessionExpiredException) {
// if the session has expired, then another election will be launched, so
// quit here
throw new SolrException(ErrorCode.SERVER_ERROR,
"ZK session expired - cancelling election for " + collection + " " + shardId);
}
SolrException.log(log,
"Error checking for the number of election participants", e);
}
// on startup and after connection timeout, wait for all known shards
if (found >= slices.getReplicas(EnumSet.of(Replica.Type.TLOG, Replica.Type.NRT)).size()) {
log.info("Enough replicas found to continue.");
return true;
} else {
if (cnt % 40 == 0) {
if (log.isInfoEnabled()) {
log.info("Waiting until we see more replicas up for shard {}: total={} found={} timeoute in={}ms"
, shardId, slices.getReplicas(EnumSet.of(Replica.Type.TLOG, Replica.Type.NRT)).size(), found,
TimeUnit.MILLISECONDS.convert(timeoutAt - System.nanoTime(), TimeUnit.NANOSECONDS));
}
}
}
if (System.nanoTime() > timeoutAt) {
log.info("Was waiting for replicas to come up, but they are taking too long - assuming they won't come back till later");
return false;
}
} else {
log.warn("Shard not found: {} for collection {}", shardId, collection);
return false;
}
Thread.sleep(500);
docCollection = zkController.getClusterState().getCollectionOrNull(collection);
slices = (docCollection == null) ? null : docCollection.getSlice(shardId);
cnt++;
}
return false;
}
// returns true if all replicas are found to be up, false if not
private boolean areAllReplicasParticipating() throws InterruptedException {
final String shardsElectZkPath = electionPath + LeaderElector.ELECTION_NODE;
final DocCollection docCollection = zkController.getClusterState().getCollectionOrNull(collection);
if (docCollection != null && docCollection.getSlice(shardId) != null) {
final Slice slices = docCollection.getSlice(shardId);
int found = 0;
try {
found = zkClient.getChildren(shardsElectZkPath, null, true).size();
} catch (KeeperException e) {
if (e instanceof KeeperException.SessionExpiredException) {
// if the session has expired, then another election will be launched, so
// quit here
throw new SolrException(ErrorCode.SERVER_ERROR,
"ZK session expired - cancelling election for " + collection + " " + shardId);
}
SolrException.log(log, "Error checking for the number of election participants", e);
}
if (found >= slices.getReplicasMap().size()) {
log.debug("All replicas are ready to participate in election.");
return true;
}
} else {
log.warn("Shard not found: {} for collection {}", shardId, collection);
return false;
}
return false;
}
private void rejoinLeaderElection(SolrCore core)
throws InterruptedException, KeeperException, IOException {
// remove our ephemeral and re join the election
if (cc.isShutDown()) {
log.debug("Not rejoining election because CoreContainer is closed");
return;
}
log.info("There may be a better leader candidate than us - going back into recovery");
cancelElection();
core.getUpdateHandler().getSolrCoreState().doRecovery(cc, core.getCoreDescriptor());
leaderElector.joinElection(this, true);
}
}
final class OverseerElectionContext extends ElectionContext {
private static final Logger log = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
private final SolrZkClient zkClient;
private final Overseer overseer;
private volatile boolean isClosed = false;
public OverseerElectionContext(SolrZkClient zkClient, Overseer overseer, final String zkNodeName) {
super(zkNodeName, Overseer.OVERSEER_ELECT, Overseer.OVERSEER_ELECT + "/leader", null, zkClient);
this.overseer = overseer;
this.zkClient = zkClient;
try {
new ZkCmdExecutor(zkClient.getZkClientTimeout()).ensureExists(Overseer.OVERSEER_ELECT, zkClient);
} catch (KeeperException e) {
throw new SolrException(ErrorCode.SERVER_ERROR, e);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
throw new SolrException(ErrorCode.SERVER_ERROR, e);
}
}
@Override
void runLeaderProcess(boolean weAreReplacement, int pauseBeforeStartMs) throws KeeperException,
InterruptedException {
if (isClosed) {
return;
}
log.info("I am going to be the leader {}", id);
final String id = leaderSeqPath
.substring(leaderSeqPath.lastIndexOf("/") + 1);
ZkNodeProps myProps = new ZkNodeProps(ID, id);
zkClient.makePath(leaderPath, Utils.toJSON(myProps),
CreateMode.EPHEMERAL, true);
if(pauseBeforeStartMs >0){
try {
Thread.sleep(pauseBeforeStartMs);
} catch (InterruptedException e) {
Thread.interrupted();
log.warn("Wait interrupted ", e);
}
}
synchronized (this) {
if (!this.isClosed && !overseer.getZkController().getCoreContainer().isShutDown()) {
overseer.start(id);
}
}
}
@Override
public void cancelElection() throws InterruptedException, KeeperException {
super.cancelElection();
overseer.close();
}
@Override
public synchronized void close() {
this.isClosed = true;
overseer.close();
}
@Override
public ElectionContext copy() {
return new OverseerElectionContext(zkClient, overseer ,id);
}
@Override
public void joinedElectionFired() {
overseer.close();
}
@Override
public void checkIfIamLeaderFired() {
// leader changed - close the overseer
overseer.close();
}
}

View File

@ -0,0 +1,110 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.solr.cloud;
import java.lang.invoke.MethodHandles;
import org.apache.solr.common.SolrException;
import org.apache.solr.common.SolrException.ErrorCode;
import org.apache.solr.common.cloud.SolrZkClient;
import org.apache.solr.common.cloud.ZkCmdExecutor;
import org.apache.solr.common.cloud.ZkNodeProps;
import org.apache.solr.common.util.Utils;
import org.apache.zookeeper.CreateMode;
import org.apache.zookeeper.KeeperException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import static org.apache.solr.common.params.CommonParams.ID;
final class OverseerElectionContext extends ElectionContext {
private static final Logger log = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
private final SolrZkClient zkClient;
private final Overseer overseer;
private volatile boolean isClosed = false;
public OverseerElectionContext(SolrZkClient zkClient, Overseer overseer, final String zkNodeName) {
super(zkNodeName, Overseer.OVERSEER_ELECT, Overseer.OVERSEER_ELECT + "/leader", null, zkClient);
this.overseer = overseer;
this.zkClient = zkClient;
try {
new ZkCmdExecutor(zkClient.getZkClientTimeout()).ensureExists(Overseer.OVERSEER_ELECT, zkClient);
} catch (KeeperException e) {
throw new SolrException(ErrorCode.SERVER_ERROR, e);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
throw new SolrException(ErrorCode.SERVER_ERROR, e);
}
}
@Override
void runLeaderProcess(boolean weAreReplacement, int pauseBeforeStartMs) throws KeeperException,
InterruptedException {
if (isClosed) {
return;
}
log.info("I am going to be the leader {}", id);
final String id = leaderSeqPath
.substring(leaderSeqPath.lastIndexOf("/") + 1);
ZkNodeProps myProps = new ZkNodeProps(ID, id);
zkClient.makePath(leaderPath, Utils.toJSON(myProps),
CreateMode.EPHEMERAL, true);
if (pauseBeforeStartMs > 0) {
try {
Thread.sleep(pauseBeforeStartMs);
} catch (InterruptedException e) {
Thread.interrupted();
log.warn("Wait interrupted ", e);
}
}
synchronized (this) {
if (!this.isClosed && !overseer.getZkController().getCoreContainer().isShutDown()) {
overseer.start(id);
}
}
}
@Override
public void cancelElection() throws InterruptedException, KeeperException {
super.cancelElection();
overseer.close();
}
@Override
public synchronized void close() {
this.isClosed = true;
overseer.close();
}
@Override
public ElectionContext copy() {
return new OverseerElectionContext(zkClient, overseer, id);
}
@Override
public void joinedElectionFired() {
overseer.close();
}
@Override
public void checkIfIamLeaderFired() {
// leader changed - close the overseer
overseer.close();
}
}

View File

@ -0,0 +1,493 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.solr.cloud;
import java.io.IOException;
import java.lang.invoke.MethodHandles;
import java.util.EnumSet;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
import org.apache.lucene.search.MatchAllDocsQuery;
import org.apache.solr.cloud.overseer.OverseerAction;
import org.apache.solr.common.SolrException;
import org.apache.solr.common.SolrException.ErrorCode;
import org.apache.solr.common.cloud.ClusterState;
import org.apache.solr.common.cloud.DocCollection;
import org.apache.solr.common.cloud.Replica;
import org.apache.solr.common.cloud.Slice;
import org.apache.solr.common.cloud.ZkCoreNodeProps;
import org.apache.solr.common.cloud.ZkNodeProps;
import org.apache.solr.common.cloud.ZkStateReader;
import org.apache.solr.common.util.Utils;
import org.apache.solr.core.CoreContainer;
import org.apache.solr.core.SolrCore;
import org.apache.solr.logging.MDCLoggingContext;
import org.apache.solr.search.SolrIndexSearcher;
import org.apache.solr.update.PeerSync;
import org.apache.solr.update.UpdateLog;
import org.apache.solr.util.RefCounted;
import org.apache.zookeeper.KeeperException;
import org.apache.zookeeper.KeeperException.SessionExpiredException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
// add core container and stop passing core around...
final class ShardLeaderElectionContext extends ShardLeaderElectionContextBase {
private static final Logger log = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
private final CoreContainer cc;
private final SyncStrategy syncStrategy;
private volatile boolean isClosed = false;
public ShardLeaderElectionContext(LeaderElector leaderElector,
final String shardId, final String collection,
final String coreNodeName, ZkNodeProps props, ZkController zkController, CoreContainer cc) {
super(leaderElector, shardId, collection, coreNodeName, props,
zkController);
this.cc = cc;
syncStrategy = new SyncStrategy(cc);
}
@Override
public void close() {
super.close();
this.isClosed = true;
syncStrategy.close();
}
@Override
public void cancelElection() throws InterruptedException, KeeperException {
String coreName = leaderProps.getStr(ZkStateReader.CORE_NAME_PROP);
try (SolrCore core = cc.getCore(coreName)) {
if (core != null) {
core.getCoreDescriptor().getCloudDescriptor().setLeader(false);
}
}
super.cancelElection();
}
@Override
public ElectionContext copy() {
return new ShardLeaderElectionContext(leaderElector, shardId, collection, id, leaderProps, zkController, cc);
}
/*
* weAreReplacement: has someone else been the leader already?
*/
@Override
void runLeaderProcess(boolean weAreReplacement, int pauseBeforeStart) throws KeeperException,
InterruptedException, IOException {
String coreName = leaderProps.getStr(ZkStateReader.CORE_NAME_PROP);
ActionThrottle lt;
try (SolrCore core = cc.getCore(coreName)) {
if (core == null) {
// shutdown or removed
return;
}
MDCLoggingContext.setCore(core);
lt = core.getUpdateHandler().getSolrCoreState().getLeaderThrottle();
}
try {
lt.minimumWaitBetweenActions();
lt.markAttemptingAction();
int leaderVoteWait = cc.getZkController().getLeaderVoteWait();
log.debug("Running the leader process for shard={} and weAreReplacement={} and leaderVoteWait={}", shardId, weAreReplacement, leaderVoteWait);
if (zkController.getClusterState().getCollection(collection).getSlice(shardId).getReplicas().size() > 1) {
// Clear the leader in clusterstate. We only need to worry about this if there is actually more than one replica.
ZkNodeProps m = new ZkNodeProps(Overseer.QUEUE_OPERATION, OverseerAction.LEADER.toLower(),
ZkStateReader.SHARD_ID_PROP, shardId, ZkStateReader.COLLECTION_PROP, collection);
zkController.getOverseer().getStateUpdateQueue().offer(Utils.toJSON(m));
}
boolean allReplicasInLine = false;
if (!weAreReplacement) {
allReplicasInLine = waitForReplicasToComeUp(leaderVoteWait);
} else {
allReplicasInLine = areAllReplicasParticipating();
}
if (isClosed) {
// Solr is shutting down or the ZooKeeper session expired while waiting for replicas. If the later,
// we cannot be sure we are still the leader, so we should bail out. The OnReconnect handler will
// re-register the cores and handle a new leadership election.
return;
}
Replica.Type replicaType;
String coreNodeName;
boolean setTermToMax = false;
try (SolrCore core = cc.getCore(coreName)) {
if (core == null) {
return;
}
replicaType = core.getCoreDescriptor().getCloudDescriptor().getReplicaType();
coreNodeName = core.getCoreDescriptor().getCloudDescriptor().getCoreNodeName();
// should I be leader?
ZkShardTerms zkShardTerms = zkController.getShardTerms(collection, shardId);
if (zkShardTerms.registered(coreNodeName) && !zkShardTerms.canBecomeLeader(coreNodeName)) {
if (!waitForEligibleBecomeLeaderAfterTimeout(zkShardTerms, coreNodeName, leaderVoteWait)) {
rejoinLeaderElection(core);
return;
} else {
// only log an error if this replica win the election
setTermToMax = true;
}
}
if (isClosed) {
return;
}
log.info("I may be the new leader - try and sync");
// we are going to attempt to be the leader
// first cancel any current recovery
core.getUpdateHandler().getSolrCoreState().cancelRecovery();
if (weAreReplacement) {
// wait a moment for any floating updates to finish
try {
Thread.sleep(2500);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
throw new SolrException(SolrException.ErrorCode.SERVICE_UNAVAILABLE, e);
}
}
PeerSync.PeerSyncResult result = null;
boolean success = false;
try {
result = syncStrategy.sync(zkController, core, leaderProps, weAreReplacement);
success = result.isSuccess();
} catch (Exception e) {
SolrException.log(log, "Exception while trying to sync", e);
result = PeerSync.PeerSyncResult.failure();
}
UpdateLog ulog = core.getUpdateHandler().getUpdateLog();
if (!success) {
boolean hasRecentUpdates = false;
if (ulog != null) {
// TODO: we could optimize this if necessary
try (UpdateLog.RecentUpdates recentUpdates = ulog.getRecentUpdates()) {
hasRecentUpdates = !recentUpdates.getVersions(1).isEmpty();
}
}
if (!hasRecentUpdates) {
// we failed sync, but we have no versions - we can't sync in that case
// - we were active
// before, so become leader anyway if no one else has any versions either
if (result.getOtherHasVersions().orElse(false)) {
log.info("We failed sync, but we have no versions - we can't sync in that case. But others have some versions, so we should not become leader");
success = false;
} else {
log.info(
"We failed sync, but we have no versions - we can't sync in that case - we were active before, so become leader anyway");
success = true;
}
}
}
// solrcloud_debug
if (log.isDebugEnabled()) {
try {
RefCounted<SolrIndexSearcher> searchHolder = core.getNewestSearcher(false);
SolrIndexSearcher searcher = searchHolder.get();
try {
if (log.isDebugEnabled()) {
log.debug("{} synched {}", core.getCoreContainer().getZkController().getNodeName()
, searcher.count(new MatchAllDocsQuery()));
}
} finally {
searchHolder.decref();
}
} catch (Exception e) {
log.error("Error in solrcloud_debug block", e);
}
}
if (!success) {
rejoinLeaderElection(core);
return;
}
}
boolean isLeader = true;
if (!isClosed) {
try {
if (replicaType == Replica.Type.TLOG) {
// stop replicate from old leader
zkController.stopReplicationFromLeader(coreName);
if (weAreReplacement) {
try (SolrCore core = cc.getCore(coreName)) {
Future<UpdateLog.RecoveryInfo> future = core.getUpdateHandler().getUpdateLog().recoverFromCurrentLog();
if (future != null) {
log.info("Replaying tlog before become new leader");
future.get();
} else {
log.info("New leader does not have old tlog to replay");
}
}
}
}
// in case of leaderVoteWait timeout, a replica with lower term can win the election
if (setTermToMax) {
log.error("WARNING: Potential data loss -- Replica {} became leader after timeout (leaderVoteWait) {}"
, "without being up-to-date with the previous leader", coreNodeName);
zkController.getShardTerms(collection, shardId).setTermEqualsToLeader(coreNodeName);
}
super.runLeaderProcess(weAreReplacement, 0);
try (SolrCore core = cc.getCore(coreName)) {
if (core != null) {
core.getCoreDescriptor().getCloudDescriptor().setLeader(true);
publishActiveIfRegisteredAndNotActive(core);
} else {
return;
}
}
if (log.isInfoEnabled()) {
log.info("I am the new leader: {} {}", ZkCoreNodeProps.getCoreUrl(leaderProps), shardId);
}
// we made it as leader - send any recovery requests we need to
syncStrategy.requestRecoveries();
} catch (SessionExpiredException e) {
throw new SolrException(ErrorCode.SERVER_ERROR,
"ZK session expired - cancelling election for " + collection + " " + shardId);
} catch (Exception e) {
isLeader = false;
SolrException.log(log, "There was a problem trying to register as the leader", e);
try (SolrCore core = cc.getCore(coreName)) {
if (core == null) {
if (log.isDebugEnabled()) {
log.debug("SolrCore not found: {} in {}", coreName, cc.getLoadedCoreNames());
}
return;
}
core.getCoreDescriptor().getCloudDescriptor().setLeader(false);
// we could not publish ourselves as leader - try and rejoin election
try {
rejoinLeaderElection(core);
} catch (SessionExpiredException exc) {
throw new SolrException(ErrorCode.SERVER_ERROR,
"ZK session expired - cancelling election for " + collection + " " + shardId);
}
}
}
} else {
cancelElection();
}
} finally {
MDCLoggingContext.clear();
}
}
/**
* Wait for other replicas with higher terms participate in the electioon
*
* @return true if after {@code timeout} there are no other replicas with higher term participate in the election,
* false if otherwise
*/
private boolean waitForEligibleBecomeLeaderAfterTimeout(ZkShardTerms zkShardTerms, String coreNodeName, int timeout) throws InterruptedException {
long timeoutAt = System.nanoTime() + TimeUnit.NANOSECONDS.convert(timeout, TimeUnit.MILLISECONDS);
while (!isClosed && !cc.isShutDown()) {
if (System.nanoTime() > timeoutAt) {
log.warn("After waiting for {}ms, no other potential leader was found, {} try to become leader anyway (core_term:{}, highest_term:{})",
timeout, coreNodeName, zkShardTerms.getTerm(coreNodeName), zkShardTerms.getHighestTerm());
return true;
}
if (replicasWithHigherTermParticipated(zkShardTerms, coreNodeName)) {
log.info("Can't become leader, other replicas with higher term participated in leader election");
return false;
}
Thread.sleep(500L);
}
return false;
}
/**
* Do other replicas with higher term participated in the election
*
* @return true if other replicas with higher term participated in the election, false if otherwise
*/
private boolean replicasWithHigherTermParticipated(ZkShardTerms zkShardTerms, String coreNodeName) {
ClusterState clusterState = zkController.getClusterState();
DocCollection docCollection = clusterState.getCollectionOrNull(collection);
Slice slices = (docCollection == null) ? null : docCollection.getSlice(shardId);
if (slices == null) return false;
long replicaTerm = zkShardTerms.getTerm(coreNodeName);
boolean isRecovering = zkShardTerms.isRecovering(coreNodeName);
for (Replica replica : slices.getReplicas()) {
if (replica.getName().equals(coreNodeName)) continue;
if (clusterState.getLiveNodes().contains(replica.getNodeName())) {
long otherTerm = zkShardTerms.getTerm(replica.getName());
boolean isOtherReplicaRecovering = zkShardTerms.isRecovering(replica.getName());
if (isRecovering && !isOtherReplicaRecovering) return true;
if (otherTerm > replicaTerm) return true;
}
}
return false;
}
public void publishActiveIfRegisteredAndNotActive(SolrCore core) throws Exception {
if (core.getCoreDescriptor().getCloudDescriptor().hasRegistered()) {
ZkStateReader zkStateReader = zkController.getZkStateReader();
zkStateReader.forceUpdateCollection(collection);
ClusterState clusterState = zkStateReader.getClusterState();
Replica rep = getReplica(clusterState, collection, leaderProps.getStr(ZkStateReader.CORE_NODE_NAME_PROP));
if (rep == null) return;
if (rep.getState() != Replica.State.ACTIVE || core.getCoreDescriptor().getCloudDescriptor().getLastPublished() != Replica.State.ACTIVE) {
log.debug("We have become the leader after core registration but are not in an ACTIVE state - publishing ACTIVE");
zkController.publish(core.getCoreDescriptor(), Replica.State.ACTIVE);
}
}
}
private Replica getReplica(ClusterState clusterState, String collectionName, String replicaName) {
if (clusterState == null) return null;
final DocCollection docCollection = clusterState.getCollectionOrNull(collectionName);
if (docCollection == null) return null;
return docCollection.getReplica(replicaName);
}
// returns true if all replicas are found to be up, false if not
private boolean waitForReplicasToComeUp(int timeoutms) throws InterruptedException {
long timeoutAt = System.nanoTime() + TimeUnit.NANOSECONDS.convert(timeoutms, TimeUnit.MILLISECONDS);
final String shardsElectZkPath = electionPath + LeaderElector.ELECTION_NODE;
DocCollection docCollection = zkController.getClusterState().getCollectionOrNull(collection);
Slice slices = (docCollection == null) ? null : docCollection.getSlice(shardId);
int cnt = 0;
while (!isClosed && !cc.isShutDown()) {
// wait for everyone to be up
if (slices != null) {
int found = 0;
try {
found = zkClient.getChildren(shardsElectZkPath, null, true).size();
} catch (KeeperException e) {
if (e instanceof KeeperException.SessionExpiredException) {
// if the session has expired, then another election will be launched, so
// quit here
throw new SolrException(ErrorCode.SERVER_ERROR,
"ZK session expired - cancelling election for " + collection + " " + shardId);
}
SolrException.log(log,
"Error checking for the number of election participants", e);
}
// on startup and after connection timeout, wait for all known shards
if (found >= slices.getReplicas(EnumSet.of(Replica.Type.TLOG, Replica.Type.NRT)).size()) {
log.info("Enough replicas found to continue.");
return true;
} else {
if (cnt % 40 == 0) {
if (log.isInfoEnabled()) {
log.info("Waiting until we see more replicas up for shard {}: total={} found={} timeoute in={}ms"
, shardId, slices.getReplicas(EnumSet.of(Replica.Type.TLOG, Replica.Type.NRT)).size(), found,
TimeUnit.MILLISECONDS.convert(timeoutAt - System.nanoTime(), TimeUnit.NANOSECONDS));
}
}
}
if (System.nanoTime() > timeoutAt) {
log.info("Was waiting for replicas to come up, but they are taking too long - assuming they won't come back till later");
return false;
}
} else {
log.warn("Shard not found: {} for collection {}", shardId, collection);
return false;
}
Thread.sleep(500);
docCollection = zkController.getClusterState().getCollectionOrNull(collection);
slices = (docCollection == null) ? null : docCollection.getSlice(shardId);
cnt++;
}
return false;
}
// returns true if all replicas are found to be up, false if not
private boolean areAllReplicasParticipating() throws InterruptedException {
final String shardsElectZkPath = electionPath + LeaderElector.ELECTION_NODE;
final DocCollection docCollection = zkController.getClusterState().getCollectionOrNull(collection);
if (docCollection != null && docCollection.getSlice(shardId) != null) {
final Slice slices = docCollection.getSlice(shardId);
int found = 0;
try {
found = zkClient.getChildren(shardsElectZkPath, null, true).size();
} catch (KeeperException e) {
if (e instanceof KeeperException.SessionExpiredException) {
// if the session has expired, then another election will be launched, so
// quit here
throw new SolrException(ErrorCode.SERVER_ERROR,
"ZK session expired - cancelling election for " + collection + " " + shardId);
}
SolrException.log(log, "Error checking for the number of election participants", e);
}
if (found >= slices.getReplicasMap().size()) {
log.debug("All replicas are ready to participate in election.");
return true;
}
} else {
log.warn("Shard not found: {} for collection {}", shardId, collection);
return false;
}
return false;
}
private void rejoinLeaderElection(SolrCore core)
throws InterruptedException, KeeperException, IOException {
// remove our ephemeral and re join the election
if (cc.isShutDown()) {
log.debug("Not rejoining election because CoreContainer is closed");
return;
}
log.info("There may be a better leader candidate than us - going back into recovery");
cancelElection();
core.getUpdateHandler().getSolrCoreState().doRecovery(cc, core.getCoreDescriptor());
leaderElector.joinElection(this, true);
}
}

View File

@ -0,0 +1,194 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.solr.cloud;
import java.io.IOException;
import java.lang.invoke.MethodHandles;
import java.util.List;
import java.util.ArrayList;
import org.apache.hadoop.fs.Path;
import org.apache.solr.cloud.overseer.OverseerAction;
import org.apache.solr.common.SolrException;
import org.apache.solr.common.SolrException.ErrorCode;
import org.apache.solr.common.cloud.Replica;
import org.apache.solr.common.cloud.SolrZkClient;
import org.apache.solr.common.cloud.ZkCmdExecutor;
import org.apache.solr.common.cloud.ZkNodeProps;
import org.apache.solr.common.cloud.ZkStateReader;
import org.apache.solr.common.util.RetryUtil;
import org.apache.solr.common.util.Utils;
import org.apache.zookeeper.CreateMode;
import org.apache.zookeeper.KeeperException;
import org.apache.zookeeper.KeeperException.NoNodeException;
import org.apache.zookeeper.KeeperException.NodeExistsException;
import org.apache.zookeeper.Op;
import org.apache.zookeeper.OpResult;
import org.apache.zookeeper.OpResult.SetDataResult;
import org.apache.zookeeper.ZooDefs;
import org.apache.zookeeper.data.Stat;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
class ShardLeaderElectionContextBase extends ElectionContext {
private static final Logger log = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
protected final SolrZkClient zkClient;
protected String shardId;
protected String collection;
protected LeaderElector leaderElector;
protected ZkStateReader zkStateReader;
protected ZkController zkController;
private Integer leaderZkNodeParentVersion;
// Prevents a race between cancelling and becoming leader.
private final Object lock = new Object();
public ShardLeaderElectionContextBase(LeaderElector leaderElector,
final String shardId, final String collection, final String coreNodeName,
ZkNodeProps props, ZkController zkController) {
super(coreNodeName, ZkStateReader.COLLECTIONS_ZKNODE + "/" + collection
+ "/leader_elect/" + shardId, ZkStateReader.getShardLeadersPath(
collection, shardId), props, zkController.getZkClient());
this.leaderElector = leaderElector;
this.zkStateReader = zkController.getZkStateReader();
this.zkClient = zkStateReader.getZkClient();
this.zkController = zkController;
this.shardId = shardId;
this.collection = collection;
String parent = new Path(leaderPath).getParent().toString();
ZkCmdExecutor zcmd = new ZkCmdExecutor(30000);
// only if /collections/{collection} exists already do we succeed in creating this path
log.info("make sure parent is created {}", parent);
try {
zcmd.ensureExists(parent, (byte[]) null, CreateMode.PERSISTENT, zkClient, 2);
} catch (KeeperException e) {
throw new RuntimeException(e);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
throw new RuntimeException(e);
}
}
@Override
public void cancelElection() throws InterruptedException, KeeperException {
super.cancelElection();
synchronized (lock) {
if (leaderZkNodeParentVersion != null) {
// no problem
// no problem
try {
// We need to be careful and make sure we *only* delete our own leader registration node.
// We do this by using a multi and ensuring the parent znode of the leader registration node
// matches the version we expect - there is a setData call that increments the parent's znode
// version whenever a leader registers.
log.debug("Removing leader registration node on cancel: {} {}", leaderPath, leaderZkNodeParentVersion);
List<Op> ops = new ArrayList<>(2);
ops.add(Op.check(new Path(leaderPath).getParent().toString(), leaderZkNodeParentVersion));
ops.add(Op.delete(leaderPath, -1));
zkClient.multi(ops, true);
} catch (InterruptedException e) {
throw e;
} catch (IllegalArgumentException e) {
SolrException.log(log, e);
}
leaderZkNodeParentVersion = null;
} else {
log.info("No version found for ephemeral leader parent node, won't remove previous leader registration.");
}
}
}
@Override
void runLeaderProcess(boolean weAreReplacement, int pauseBeforeStartMs)
throws KeeperException, InterruptedException, IOException {
// register as leader - if an ephemeral is already there, wait to see if it goes away
String parent = new Path(leaderPath).getParent().toString();
try {
RetryUtil.retryOnThrowable(NodeExistsException.class, 60000, 5000, () -> {
synchronized (lock) {
log.info("Creating leader registration node {} after winning as {}", leaderPath, leaderSeqPath);
List<Op> ops = new ArrayList<>(2);
// We use a multi operation to get the parent nodes version, which will
// be used to make sure we only remove our own leader registration node.
// The setData call used to get the parent version is also the trigger to
// increment the version. We also do a sanity check that our leaderSeqPath exists.
ops.add(Op.check(leaderSeqPath, -1));
ops.add(Op.create(leaderPath, Utils.toJSON(leaderProps), zkClient.getZkACLProvider().getACLsToAdd(leaderPath), CreateMode.EPHEMERAL));
ops.add(Op.setData(parent, null, -1));
List<OpResult> results;
results = zkClient.multi(ops, true);
for (OpResult result : results) {
if (result.getType() == ZooDefs.OpCode.setData) {
SetDataResult dresult = (SetDataResult) result;
Stat stat = dresult.getStat();
leaderZkNodeParentVersion = stat.getVersion();
return;
}
}
assert leaderZkNodeParentVersion != null;
}
});
} catch (NoNodeException e) {
log.info("Will not register as leader because it seems the election is no longer taking place.");
return;
} catch (Throwable t) {
if (t instanceof OutOfMemoryError) {
throw (OutOfMemoryError) t;
}
throw new SolrException(ErrorCode.SERVER_ERROR, "Could not register as the leader because creating the ephemeral registration node in ZooKeeper failed", t);
}
assert shardId != null;
boolean isAlreadyLeader = false;
if (zkStateReader.getClusterState() != null &&
zkStateReader.getClusterState().getCollection(collection).getSlice(shardId).getReplicas().size() < 2) {
Replica leader = zkStateReader.getLeader(collection, shardId);
if (leader != null
&& leader.getBaseUrl().equals(leaderProps.get(ZkStateReader.BASE_URL_PROP))
&& leader.getCoreName().equals(leaderProps.get(ZkStateReader.CORE_NAME_PROP))) {
isAlreadyLeader = true;
}
}
if (!isAlreadyLeader) {
ZkNodeProps m = ZkNodeProps.fromKeyVals(Overseer.QUEUE_OPERATION, OverseerAction.LEADER.toLower(),
ZkStateReader.SHARD_ID_PROP, shardId,
ZkStateReader.COLLECTION_PROP, collection,
ZkStateReader.BASE_URL_PROP, leaderProps.get(ZkStateReader.BASE_URL_PROP),
ZkStateReader.CORE_NAME_PROP, leaderProps.get(ZkStateReader.CORE_NAME_PROP),
ZkStateReader.STATE_PROP, Replica.State.ACTIVE.toString());
assert zkController != null;
assert zkController.getOverseer() != null;
zkController.getOverseer().offerStateUpdate(Utils.toJSON(m));
}
}
public LeaderElector getLeaderElector() {
return leaderElector;
}
Integer getLeaderZkNodeParentVersion() {
synchronized (lock) {
return leaderZkNodeParentVersion;
}
}
}

View File

@ -17,15 +17,11 @@
package org.apache.solr.handler.component; package org.apache.solr.handler.component;
import java.io.IOException; import java.io.IOException;
import java.util.ArrayList;
import java.util.Collections;
import java.util.LinkedHashMap; import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map; import java.util.Map;
import org.apache.solr.common.SolrException; import org.apache.solr.common.SolrException;
import org.apache.solr.common.params.ShardParams; import org.apache.solr.common.params.ShardParams;
import org.apache.solr.common.params.SolrParams;
import org.apache.solr.common.params.StatsParams; import org.apache.solr.common.params.StatsParams;
import org.apache.solr.common.util.NamedList; import org.apache.solr.common.util.NamedList;
import org.apache.solr.common.util.SimpleOrderedMap; import org.apache.solr.common.util.SimpleOrderedMap;
@ -41,8 +37,8 @@ public class StatsComponent extends SearchComponent {
@Override @Override
public void prepare(ResponseBuilder rb) throws IOException { public void prepare(ResponseBuilder rb) throws IOException {
if (rb.req.getParams().getBool(StatsParams.STATS,false)) { if (rb.req.getParams().getBool(StatsParams.STATS, false)) {
rb.setNeedDocSet( true ); rb.setNeedDocSet(true);
rb.doStats = true; rb.doStats = true;
rb._statsInfo = new StatsInfo(rb); rb._statsInfo = new StatsInfo(rb);
for (StatsField statsField : rb._statsInfo.getStatsFields()) { for (StatsField statsField : rb._statsInfo.getStatsFields()) {
@ -64,7 +60,7 @@ public class StatsComponent extends SearchComponent {
statsValues.put(statsField.getOutputKey(), statsField.computeLocalStatsValues(docs)); statsValues.put(statsField.getOutputKey(), statsField.computeLocalStatsValues(docs));
} }
rb.rsp.add( "stats", convertToResponse(statsValues) ); rb.rsp.add("stats", convertToResponse(statsValues));
} }
@Override @Override
@ -145,13 +141,13 @@ public class StatsComponent extends SearchComponent {
* including the esoteric "stats_fields" wrapper. * including the esoteric "stats_fields" wrapper.
*/ */
public static NamedList<NamedList<NamedList<?>>> convertToResponse public static NamedList<NamedList<NamedList<?>>> convertToResponse
(Map<String,StatsValues> statsValues) { (Map<String, StatsValues> statsValues) {
NamedList<NamedList<NamedList<?>>> stats = new SimpleOrderedMap<>(); NamedList<NamedList<NamedList<?>>> stats = new SimpleOrderedMap<>();
NamedList<NamedList<?>> stats_fields = new SimpleOrderedMap<>(); NamedList<NamedList<?>> stats_fields = new SimpleOrderedMap<>();
stats.add("stats_fields", stats_fields); stats.add("stats_fields", stats_fields);
for (Map.Entry<String,StatsValues> entry : statsValues.entrySet()) { for (Map.Entry<String, StatsValues> entry : statsValues.entrySet()) {
String key = entry.getKey(); String key = entry.getKey();
NamedList stv = entry.getValue().getStatsValues(); NamedList stv = entry.getValue().getStatsValues();
stats_fields.add(key, stv); stats_fields.add(key, stv);
@ -169,87 +165,3 @@ public class StatsComponent extends SearchComponent {
} }
} }
/**
* Models all of the information about stats needed for a single request
* @see StatsField
*/
class StatsInfo {
private final ResponseBuilder rb;
private final List<StatsField> statsFields = new ArrayList<>(7);
private final Map<String, StatsValues> distribStatsValues = new LinkedHashMap<>();
private final Map<String, StatsField> statsFieldMap = new LinkedHashMap<>();
private final Map<String, List<StatsField>> tagToStatsFields = new LinkedHashMap<>();
public StatsInfo(ResponseBuilder rb) {
this.rb = rb;
SolrParams params = rb.req.getParams();
String[] statsParams = params.getParams(StatsParams.STATS_FIELD);
if (null == statsParams) {
// no stats.field params, nothing to parse.
return;
}
for (String paramValue : statsParams) {
StatsField current = new StatsField(rb, paramValue);
statsFields.add(current);
for (String tag : current.getTagList()) {
List<StatsField> fieldList = tagToStatsFields.get(tag);
if (fieldList == null) {
fieldList = new ArrayList<>();
}
fieldList.add(current);
tagToStatsFields.put(tag, fieldList);
}
statsFieldMap.put(current.getOutputKey(), current);
distribStatsValues.put(current.getOutputKey(),
StatsValuesFactory.createStatsValues(current));
}
}
/**
* Returns an immutable list of {@link StatsField} instances
* modeling each of the {@link StatsParams#STATS_FIELD} params specified
* as part of this request
*/
public List<StatsField> getStatsFields() {
return Collections.unmodifiableList(statsFields);
}
/**
* Returns the {@link StatsField} associated with the specified (effective)
* outputKey, or null if there was no {@link StatsParams#STATS_FIELD} param
* that would corrispond with that key.
*/
public StatsField getStatsField(String outputKey) {
return statsFieldMap.get(outputKey);
}
/**
* Return immutable list of {@link StatsField} instances by string tag local parameter.
*
* @param tag tag local parameter
* @return list of stats fields
*/
public List<StatsField> getStatsFieldsByTag(String tag) {
List<StatsField> raw = tagToStatsFields.get(tag);
if (null == raw) {
return Collections.emptyList();
} else {
return Collections.unmodifiableList(raw);
}
}
/**
* Returns an immutable map of response key =&gt; {@link StatsValues}
* instances for the current distributed request.
* Depending on where we are in the process of handling this request,
* these {@link StatsValues} instances may not be complete -- but they
* will never be null.
*/
public Map<String, StatsValues> getAggregateStatsValues() {
return Collections.unmodifiableMap(distribStatsValues);
}
}

View File

@ -0,0 +1,108 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.solr.handler.component;
import org.apache.solr.common.params.SolrParams;
import org.apache.solr.common.params.StatsParams;
import java.util.*;
/**
* Models all of the information about stats needed for a single request
*
* @see StatsField
*/
class StatsInfo {
private final ResponseBuilder rb;
private final List<StatsField> statsFields = new ArrayList<>(7);
private final Map<String, StatsValues> distribStatsValues = new LinkedHashMap<>();
private final Map<String, StatsField> statsFieldMap = new LinkedHashMap<>();
private final Map<String, List<StatsField>> tagToStatsFields = new LinkedHashMap<>();
public StatsInfo(ResponseBuilder rb) {
this.rb = rb;
SolrParams params = rb.req.getParams();
String[] statsParams = params.getParams(StatsParams.STATS_FIELD);
if (null == statsParams) {
// no stats.field params, nothing to parse.
return;
}
for (String paramValue : statsParams) {
StatsField current = new StatsField(rb, paramValue);
statsFields.add(current);
for (String tag : current.getTagList()) {
List<StatsField> fieldList = tagToStatsFields.get(tag);
if (fieldList == null) {
fieldList = new ArrayList<>();
}
fieldList.add(current);
tagToStatsFields.put(tag, fieldList);
}
statsFieldMap.put(current.getOutputKey(), current);
distribStatsValues.put(current.getOutputKey(),
StatsValuesFactory.createStatsValues(current));
}
}
/**
* Returns an immutable list of {@link StatsField} instances
* modeling each of the {@link StatsParams#STATS_FIELD} params specified
* as part of this request
*/
public List<StatsField> getStatsFields() {
return Collections.unmodifiableList(statsFields);
}
/**
* Returns the {@link StatsField} associated with the specified (effective)
* outputKey, or null if there was no {@link StatsParams#STATS_FIELD} param
* that would corrispond with that key.
*/
public StatsField getStatsField(String outputKey) {
return statsFieldMap.get(outputKey);
}
/**
* Return immutable list of {@link StatsField} instances by string tag local parameter.
*
* @param tag tag local parameter
* @return list of stats fields
*/
public List<StatsField> getStatsFieldsByTag(String tag) {
List<StatsField> raw = tagToStatsFields.get(tag);
if (null == raw) {
return Collections.emptyList();
} else {
return Collections.unmodifiableList(raw);
}
}
/**
* Returns an immutable map of response key =&gt; {@link StatsValues}
* instances for the current distributed request.
* Depending on where we are in the process of handling this request,
* these {@link StatsValues} instances may not be complete -- but they
* will never be null.
*/
public Map<String, StatsValues> getAggregateStatsValues() {
return Collections.unmodifiableMap(distribStatsValues);
}
}

View File

@ -19,10 +19,11 @@ package org.apache.solr.handler.export;
interface DoubleComp { interface DoubleComp {
int compare(double a, double b); int compare(double a, double b);
double resetValue();
}
class DoubleAsc implements DoubleComp { double resetValue();
static class DoubleAsc implements DoubleComp {
public double resetValue() { public double resetValue() {
return Double.MAX_VALUE; return Double.MAX_VALUE;
} }
@ -30,9 +31,9 @@ class DoubleAsc implements DoubleComp {
public int compare(double a, double b) { public int compare(double a, double b) {
return Double.compare(b, a); return Double.compare(b, a);
} }
} }
class DoubleDesc implements DoubleComp { static class DoubleDesc implements DoubleComp {
public double resetValue() { public double resetValue() {
return -Double.MAX_VALUE; return -Double.MAX_VALUE;
} }
@ -40,4 +41,5 @@ class DoubleDesc implements DoubleComp {
public int compare(double a, double b) { public int compare(double a, double b) {
return Double.compare(a, b); return Double.compare(a, b);
} }
}
} }

View File

@ -408,41 +408,41 @@ public class ExportWriter implements SolrCore.RawWriter, Closeable {
if (ft instanceof IntValueFieldType) { if (ft instanceof IntValueFieldType) {
if (reverse) { if (reverse) {
sortValues[i] = new IntValue(field, new IntDesc()); sortValues[i] = new IntValue(field, new IntComp.IntDesc());
} else { } else {
sortValues[i] = new IntValue(field, new IntAsc()); sortValues[i] = new IntValue(field, new IntComp.IntAsc());
} }
} else if (ft instanceof FloatValueFieldType) { } else if (ft instanceof FloatValueFieldType) {
if (reverse) { if (reverse) {
sortValues[i] = new FloatValue(field, new FloatDesc()); sortValues[i] = new FloatValue(field, new FloatComp.FloatDesc());
} else { } else {
sortValues[i] = new FloatValue(field, new FloatAsc()); sortValues[i] = new FloatValue(field, new FloatComp.FloatAsc());
} }
} else if (ft instanceof DoubleValueFieldType) { } else if (ft instanceof DoubleValueFieldType) {
if (reverse) { if (reverse) {
sortValues[i] = new DoubleValue(field, new DoubleDesc()); sortValues[i] = new DoubleValue(field, new DoubleComp.DoubleDesc());
} else { } else {
sortValues[i] = new DoubleValue(field, new DoubleAsc()); sortValues[i] = new DoubleValue(field, new DoubleComp.DoubleAsc());
} }
} else if (ft instanceof LongValueFieldType) { } else if (ft instanceof LongValueFieldType) {
if (reverse) { if (reverse) {
sortValues[i] = new LongValue(field, new LongDesc()); sortValues[i] = new LongValue(field, new LongComp.LongDesc());
} else { } else {
sortValues[i] = new LongValue(field, new LongAsc()); sortValues[i] = new LongValue(field, new LongComp.LongAsc());
} }
} else if (ft instanceof StrField || ft instanceof SortableTextField) { } else if (ft instanceof StrField || ft instanceof SortableTextField) {
LeafReader reader = searcher.getSlowAtomicReader(); LeafReader reader = searcher.getSlowAtomicReader();
SortedDocValues vals = reader.getSortedDocValues(field); SortedDocValues vals = reader.getSortedDocValues(field);
if (reverse) { if (reverse) {
sortValues[i] = new StringValue(vals, field, new IntDesc()); sortValues[i] = new StringValue(vals, field, new IntComp.IntDesc());
} else { } else {
sortValues[i] = new StringValue(vals, field, new IntAsc()); sortValues[i] = new StringValue(vals, field, new IntComp.IntAsc());
} }
} else if (ft instanceof DateValueFieldType) { } else if (ft instanceof DateValueFieldType) {
if (reverse) { if (reverse) {
sortValues[i] = new LongValue(field, new LongDesc()); sortValues[i] = new LongValue(field, new LongComp.LongDesc());
} else { } else {
sortValues[i] = new LongValue(field, new LongAsc()); sortValues[i] = new LongValue(field, new LongComp.LongAsc());
} }
} else if (ft instanceof BoolField) { } else if (ft instanceof BoolField) {
// This is a bit of a hack, but since the boolean field stores ByteRefs, just like Strings // This is a bit of a hack, but since the boolean field stores ByteRefs, just like Strings
@ -451,9 +451,9 @@ public class ExportWriter implements SolrCore.RawWriter, Closeable {
LeafReader reader = searcher.getSlowAtomicReader(); LeafReader reader = searcher.getSlowAtomicReader();
SortedDocValues vals = reader.getSortedDocValues(field); SortedDocValues vals = reader.getSortedDocValues(field);
if (reverse) { if (reverse) {
sortValues[i] = new StringValue(vals, field, new IntDesc()); sortValues[i] = new StringValue(vals, field, new IntComp.IntDesc());
} else { } else {
sortValues[i] = new StringValue(vals, field, new IntAsc()); sortValues[i] = new StringValue(vals, field, new IntComp.IntAsc());
} }
} else { } else {
throw new IOException("Sort fields must be one of the following types: int,float,long,double,string,date,boolean,SortableText"); throw new IOException("Sort fields must be one of the following types: int,float,long,double,string,date,boolean,SortableText");

View File

@ -19,10 +19,10 @@ package org.apache.solr.handler.export;
interface FloatComp { interface FloatComp {
int compare(float a, float b); int compare(float a, float b);
float resetValue();
}
class FloatAsc implements FloatComp { float resetValue();
static class FloatAsc implements FloatComp {
public float resetValue() { public float resetValue() {
return Float.MAX_VALUE; return Float.MAX_VALUE;
} }
@ -30,9 +30,9 @@ class FloatAsc implements FloatComp {
public int compare(float a, float b) { public int compare(float a, float b) {
return Float.compare(b, a); return Float.compare(b, a);
} }
} }
class FloatDesc implements FloatComp { static class FloatDesc implements FloatComp {
public float resetValue() { public float resetValue() {
return -Float.MAX_VALUE; return -Float.MAX_VALUE;
} }
@ -40,5 +40,5 @@ class FloatDesc implements FloatComp {
public int compare(float a, float b) { public int compare(float a, float b) {
return Float.compare(a, b); return Float.compare(a, b);
} }
}
} }

View File

@ -19,10 +19,11 @@ package org.apache.solr.handler.export;
public interface IntComp { public interface IntComp {
int compare(int a, int b); int compare(int a, int b);
int resetValue();
}
class IntAsc implements IntComp { int resetValue();
static class IntAsc implements IntComp {
public int resetValue() { public int resetValue() {
return Integer.MAX_VALUE; return Integer.MAX_VALUE;
@ -31,9 +32,9 @@ class IntAsc implements IntComp {
public int compare(int a, int b) { public int compare(int a, int b) {
return Integer.compare(b, a); return Integer.compare(b, a);
} }
} }
class IntDesc implements IntComp { static class IntDesc implements IntComp {
public int resetValue() { public int resetValue() {
return Integer.MIN_VALUE; return Integer.MIN_VALUE;
@ -42,4 +43,5 @@ class IntDesc implements IntComp {
public int compare(int a, int b) { public int compare(int a, int b) {
return Integer.compare(a, b); return Integer.compare(a, b);
} }
}
} }

View File

@ -19,10 +19,10 @@ package org.apache.solr.handler.export;
interface LongComp { interface LongComp {
int compare(long a, long b); int compare(long a, long b);
long resetValue();
}
class LongAsc implements LongComp { long resetValue();
static class LongAsc implements LongComp {
public long resetValue() { public long resetValue() {
return Long.MAX_VALUE; return Long.MAX_VALUE;
@ -31,9 +31,9 @@ class LongAsc implements LongComp {
public int compare(long a, long b) { public int compare(long a, long b) {
return Long.compare(b, a); return Long.compare(b, a);
} }
} }
class LongDesc implements LongComp { static class LongDesc implements LongComp {
public long resetValue() { public long resetValue() {
return Long.MIN_VALUE; return Long.MIN_VALUE;
@ -42,4 +42,5 @@ class LongDesc implements LongComp {
public int compare(long a, long b) { public int compare(long a, long b) {
return Long.compare(a, b); return Long.compare(a, b);
} }
}
} }

View File

@ -38,7 +38,7 @@ public class ShardAugmenterFactory extends TransformerFactory
v = "[not a shard request]"; v = "[not a shard request]";
} }
} }
return new ValueAugmenter( field, v ); return new ValueAugmenterFactory.ValueAugmenter( field, v );
} }
} }

View File

@ -28,31 +28,28 @@ import org.apache.solr.util.DateMathParser;
* *
* @since solr 4.0 * @since solr 4.0
*/ */
public class ValueAugmenterFactory extends TransformerFactory public class ValueAugmenterFactory extends TransformerFactory {
{
protected Object value = null; protected Object value = null;
protected Object defaultValue = null; protected Object defaultValue = null;
@Override @Override
public void init(NamedList args) { public void init(NamedList args) {
value = args.get( "value" ); value = args.get("value");
if( value == null ) { if (value == null) {
defaultValue = args.get( "defaultValue" ); defaultValue = args.get("defaultValue");
} }
} }
public static Object getObjectFrom( String val, String type ) public static Object getObjectFrom(String val, String type) {
{ if (type != null) {
if( type != null ) {
try { try {
if( "int".equals( type ) ) return Integer.valueOf( val ); if ("int".equals(type)) return Integer.valueOf(val);
if( "double".equals( type ) ) return Double.valueOf( val ); if ("double".equals(type)) return Double.valueOf(val);
if( "float".equals( type ) ) return Float.valueOf( val ); if ("float".equals(type)) return Float.valueOf(val);
if( "date".equals( type ) ) return DateMathParser.parseMath(null, val ); if ("date".equals(type)) return DateMathParser.parseMath(null, val);
} } catch (Exception ex) {
catch( Exception ex ) { throw new SolrException(ErrorCode.BAD_REQUEST,
throw new SolrException( ErrorCode.BAD_REQUEST, "Unable to parse " + type + "=" + val, ex);
"Unable to parse "+type+"="+val, ex );
} }
} }
return val; return val;
@ -61,43 +58,40 @@ public class ValueAugmenterFactory extends TransformerFactory
@Override @Override
public DocTransformer create(String field, SolrParams params, SolrQueryRequest req) { public DocTransformer create(String field, SolrParams params, SolrQueryRequest req) {
Object val = value; Object val = value;
if( val == null ) { if (val == null) {
String v = params.get("v"); String v = params.get("v");
if( v == null ) { if (v == null) {
val = defaultValue; val = defaultValue;
} } else {
else {
val = getObjectFrom(v, params.get("t")); val = getObjectFrom(v, params.get("t"));
} }
if( val == null ) { if (val == null) {
throw new SolrException( ErrorCode.BAD_REQUEST, throw new SolrException(ErrorCode.BAD_REQUEST,
"ValueAugmenter is missing a value -- should be defined in solrconfig or inline" ); "ValueAugmenter is missing a value -- should be defined in solrconfig or inline");
} }
} }
return new ValueAugmenter( field, val ); return new ValueAugmenter(field, val);
} }
}
class ValueAugmenter extends DocTransformer
{ static class ValueAugmenter extends DocTransformer {
final String name; final String name;
final Object value; final Object value;
public ValueAugmenter( String name, Object value ) public ValueAugmenter(String name, Object value) {
{
this.name = name; this.name = name;
this.value = value; this.value = value;
} }
@Override @Override
public String getName() public String getName() {
{
return name; return name;
} }
@Override @Override
public void transform(SolrDocument doc, int docid) { public void transform(SolrDocument doc, int docid) {
doc.setField( name, value ); doc.setField(name, value);
}
} }
} }

View File

@ -16,18 +16,22 @@
*/ */
package org.apache.solr.search.facet; package org.apache.solr.search.facet;
import java.util.List;
import java.util.ArrayList;
import java.util.Map;
import java.util.Optional;
import org.apache.solr.common.SolrException; import org.apache.solr.common.SolrException;
import org.apache.solr.common.params.SolrParams; import org.apache.solr.common.params.SolrParams;
import org.apache.solr.common.util.NamedList; import org.apache.solr.common.util.NamedList;
import org.apache.solr.common.util.StrUtils; import org.apache.solr.common.util.StrUtils;
import org.apache.solr.search.FunctionQParser;
import org.apache.solr.request.SolrQueryRequest; import org.apache.solr.request.SolrQueryRequest;
import org.apache.solr.schema.IndexSchema; import org.apache.solr.schema.IndexSchema;
import org.apache.solr.search.FunctionQParser; import org.apache.solr.search.QParser;
import org.apache.solr.search.SyntaxError; import org.apache.solr.search.SyntaxError;
import java.util.ArrayList; import static org.apache.solr.common.params.CommonParams.SORT;
import java.util.List;
import java.util.Map;
abstract class FacetParser<FacetRequestT extends FacetRequest> { abstract class FacetParser<FacetRequestT extends FacetRequest> {
protected FacetRequestT facet; protected FacetRequestT facet;
@ -134,9 +138,9 @@ abstract class FacetParser<FacetRequestT extends FacetRequest> {
switch (type) { switch (type) {
case "field": case "field":
case "terms": case "terms":
return new FacetRequest.FacetFieldParser(this, key).parse(args); return new FacetFieldParser(this, key).parse(args);
case "query": case "query":
return new FacetRequest.FacetQueryParser(this, key).parse(args); return new FacetQueryParser(this, key).parse(args);
case "range": case "range":
return new FacetRangeParser(this, key).parse(args); return new FacetRangeParser(this, key).parse(args);
case "heatmap": case "heatmap":
@ -411,4 +415,223 @@ abstract class FacetParser<FacetRequestT extends FacetRequest> {
nl.addAll(jsonObject); nl.addAll(jsonObject);
return SolrParams.toSolrParams(nl); return SolrParams.toSolrParams(nl);
} }
// TODO Make this private (or at least not static) and introduce
// a newInstance method on FacetParser that returns one of these?
static class FacetTopParser extends FacetParser<FacetQuery> {
private SolrQueryRequest req;
public FacetTopParser(SolrQueryRequest req) {
super(null, "facet");
this.facet = new FacetQuery();
this.req = req;
}
@Override
public FacetQuery parse(Object args) throws SyntaxError {
parseSubs(args);
return facet;
}
@Override
public SolrQueryRequest getSolrRequest() {
return req;
}
@Override
public IndexSchema getSchema() {
return req.getSchema();
}
}
static class FacetQueryParser extends FacetParser<FacetQuery> {
public FacetQueryParser(@SuppressWarnings("rawtypes") FacetParser parent, String key) {
super(parent, key);
facet = new FacetQuery();
}
@Override
public FacetQuery parse(Object arg) throws SyntaxError {
parseCommonParams(arg);
String qstring = null;
if (arg instanceof String) {
// just the field name...
qstring = (String)arg;
} else if (arg instanceof Map) {
@SuppressWarnings({"unchecked"})
Map<String, Object> m = (Map<String, Object>) arg;
qstring = getString(m, "q", null);
if (qstring == null) {
qstring = getString(m, "query", null);
}
// OK to parse subs before we have parsed our own query?
// as long as subs don't need to know about it.
parseSubs( m.get("facet") );
} else if (arg != null) {
// something lke json.facet.facet.query=2
throw err("Expected string/map for facet query, received " + arg.getClass().getSimpleName() + "=" + arg);
}
// TODO: substats that are from defaults!!!
if (qstring != null) {
QParser parser = QParser.getParser(qstring, getSolrRequest());
parser.setIsFilter(true);
facet.q = parser.getQuery();
}
return facet;
}
}
/*** not a separate type of parser for now...
static class FacetBlockParentParser extends FacetParser<FacetBlockParent> {
public FacetBlockParentParser(FacetParser parent, String key) {
super(parent, key);
facet = new FacetBlockParent();
}
@Override
public FacetBlockParent parse(Object arg) throws SyntaxError {
parseCommonParams(arg);
if (arg instanceof String) {
// just the field name...
facet.parents = (String)arg;
} else if (arg instanceof Map) {
Map<String, Object> m = (Map<String, Object>) arg;
facet.parents = getString(m, "parents", null);
parseSubs( m.get("facet") );
}
return facet;
}
}
***/
static class FacetFieldParser extends FacetParser<FacetField> {
@SuppressWarnings({"rawtypes"})
public FacetFieldParser(FacetParser parent, String key) {
super(parent, key);
facet = new FacetField();
}
public FacetField parse(Object arg) throws SyntaxError {
parseCommonParams(arg);
if (arg instanceof String) {
// just the field name...
facet.field = (String)arg;
} else if (arg instanceof Map) {
@SuppressWarnings({"unchecked"})
Map<String, Object> m = (Map<String, Object>) arg;
facet.field = getField(m);
facet.offset = getLong(m, "offset", facet.offset);
facet.limit = getLong(m, "limit", facet.limit);
facet.overrequest = (int) getLong(m, "overrequest", facet.overrequest);
facet.overrefine = (int) getLong(m, "overrefine", facet.overrefine);
if (facet.limit == 0) facet.offset = 0; // normalize. an offset with a limit of non-zero isn't useful.
facet.mincount = getLong(m, "mincount", facet.mincount);
facet.missing = getBoolean(m, "missing", facet.missing);
facet.numBuckets = getBoolean(m, "numBuckets", facet.numBuckets);
facet.prefix = getString(m, "prefix", facet.prefix);
facet.allBuckets = getBoolean(m, "allBuckets", facet.allBuckets);
facet.method = FacetField.FacetMethod.fromString(getString(m, "method", null));
facet.cacheDf = (int)getLong(m, "cacheDf", facet.cacheDf);
// TODO: pull up to higher level?
facet.refine = FacetRequest.RefineMethod.fromObj(m.get("refine"));
facet.perSeg = getBooleanOrNull(m, "perSeg");
// facet.sort may depend on a facet stat...
// should we be parsing / validating this here, or in the execution environment?
Object o = m.get("facet");
parseSubs(o);
facet.sort = parseAndValidateSort(facet, m, SORT);
facet.prelim_sort = parseAndValidateSort(facet, m, "prelim_sort");
} else if (arg != null) {
// something like json.facet.facet.field=2
throw err("Expected string/map for facet field, received " + arg.getClass().getSimpleName() + "=" + arg);
}
if (null == facet.sort) {
facet.sort = FacetRequest.FacetSort.COUNT_DESC;
}
return facet;
}
/**
* Parses, validates and returns the {@link FacetRequest.FacetSort} for given sortParam
* and facet field
* <p>
* Currently, supported sort specifications are 'mystat desc' OR {mystat: 'desc'}
* index - This is equivalent to 'index asc'
* count - This is equivalent to 'count desc'
* </p>
*
* @param facet {@link FacetField} for which sort needs to be parsed and validated
* @param args map containing the sortVal for given sortParam
* @param sortParam parameter for which sort needs to parsed and validated
* @return parsed facet sort
*/
private static FacetRequest.FacetSort parseAndValidateSort(FacetField facet, Map<String, Object> args, String sortParam) {
Object sort = args.get(sortParam);
if (sort == null) {
return null;
}
FacetRequest.FacetSort facetSort = null;
if (sort instanceof String) {
String sortStr = (String)sort;
if (sortStr.endsWith(" asc")) {
facetSort = new FacetRequest.FacetSort(sortStr.substring(0, sortStr.length()-" asc".length()),
FacetRequest.SortDirection.asc);
} else if (sortStr.endsWith(" desc")) {
facetSort = new FacetRequest.FacetSort(sortStr.substring(0, sortStr.length()-" desc".length()),
FacetRequest.SortDirection.desc);
} else {
facetSort = new FacetRequest.FacetSort(sortStr,
// default direction for "index" is ascending
("index".equals(sortStr)
? FacetRequest.SortDirection.asc
: FacetRequest.SortDirection.desc));
}
} else if (sort instanceof Map) {
// { myvar : 'desc' }
@SuppressWarnings("unchecked")
Optional<Map.Entry<String,Object>> optional = ((Map<String,Object>)sort).entrySet().stream().findFirst();
if (optional.isPresent()) {
Map.Entry<String, Object> entry = optional.get();
facetSort = new FacetRequest.FacetSort(entry.getKey(), FacetRequest.SortDirection.fromObj(entry.getValue()));
}
} else {
throw new SolrException(SolrException.ErrorCode.BAD_REQUEST,
"Expected string/map for '" + sortParam +"', received "+ sort.getClass().getSimpleName() + "=" + sort);
}
Map<String, AggValueSource> facetStats = facet.facetStats;
// validate facet sort
boolean isValidSort = facetSort == null ||
"index".equals(facetSort.sortVariable) ||
"count".equals(facetSort.sortVariable) ||
(facetStats != null && facetStats.containsKey(facetSort.sortVariable));
if (!isValidSort) {
throw new SolrException(SolrException.ErrorCode.BAD_REQUEST,
"Invalid " + sortParam + " option '" + sort + "' for field '" + facet.field + "'");
}
return facetSort;
}
}
} }

View File

@ -21,16 +21,13 @@ import java.util.LinkedHashMap;
import java.util.List; import java.util.List;
import java.util.Map; import java.util.Map;
import java.util.Objects; import java.util.Objects;
import java.util.Optional;
import org.apache.lucene.search.Query; import org.apache.lucene.search.Query;
import org.apache.solr.common.SolrException; import org.apache.solr.common.SolrException;
import org.apache.solr.common.params.SolrParams; import org.apache.solr.common.params.SolrParams;
import org.apache.solr.request.SolrQueryRequest; import org.apache.solr.request.SolrQueryRequest;
import org.apache.solr.schema.IndexSchema;
import org.apache.solr.search.DocSet; import org.apache.solr.search.DocSet;
import org.apache.solr.search.JoinQParserPlugin; import org.apache.solr.search.JoinQParserPlugin;
import org.apache.solr.search.QParser;
import org.apache.solr.search.QueryContext; import org.apache.solr.search.QueryContext;
import org.apache.solr.search.SolrConstantScoreQuery; import org.apache.solr.search.SolrConstantScoreQuery;
import org.apache.solr.search.SyntaxError; import org.apache.solr.search.SyntaxError;
@ -38,7 +35,6 @@ import org.apache.solr.search.join.GraphQuery;
import org.apache.solr.search.join.GraphQueryParser; import org.apache.solr.search.join.GraphQueryParser;
import org.apache.solr.util.RTimer; import org.apache.solr.util.RTimer;
import static org.apache.solr.common.params.CommonParams.SORT;
import static org.apache.solr.search.facet.FacetRequest.RefineMethod.NONE; import static org.apache.solr.search.facet.FacetRequest.RefineMethod.NONE;
/** /**
@ -302,7 +298,7 @@ public abstract class FacetRequest {
*/ */
public static FacetRequest parse(SolrQueryRequest req, Map<String, Object> params) { public static FacetRequest parse(SolrQueryRequest req, Map<String, Object> params) {
@SuppressWarnings({"rawtypes"}) @SuppressWarnings({"rawtypes"})
FacetParser parser = new FacetTopParser(req); FacetParser parser = new FacetParser.FacetTopParser(req);
try { try {
return parser.parse(params); return parser.parse(params);
} catch (SyntaxError syntaxError) { } catch (SyntaxError syntaxError) {
@ -321,7 +317,7 @@ public abstract class FacetRequest {
*/ */
public static FacetRequest parseOneFacetReq(SolrQueryRequest req, Map<String, Object> params) { public static FacetRequest parseOneFacetReq(SolrQueryRequest req, Map<String, Object> params) {
@SuppressWarnings("rawtypes") @SuppressWarnings("rawtypes")
FacetParser parser = new FacetTopParser(req); FacetParser parser = new FacetParser.FacetTopParser(req);
try { try {
return (FacetRequest) parser.parseFacetOrStat("", params); return (FacetRequest) parser.parseFacetOrStat("", params);
} catch (SyntaxError syntaxError) { } catch (SyntaxError syntaxError) {
@ -437,221 +433,6 @@ public abstract class FacetRequest {
public abstract Map<String, Object> getFacetDescription(); public abstract Map<String, Object> getFacetDescription();
static class FacetTopParser extends FacetParser<FacetQuery> {
private SolrQueryRequest req;
public FacetTopParser(SolrQueryRequest req) {
super(null, "facet");
this.facet = new FacetQuery();
this.req = req;
}
@Override
public FacetQuery parse(Object args) throws SyntaxError {
parseSubs(args);
return facet;
}
@Override
public SolrQueryRequest getSolrRequest() {
return req;
}
@Override
public IndexSchema getSchema() {
return req.getSchema();
}
}
static class FacetQueryParser extends FacetParser<FacetQuery> {
public FacetQueryParser(@SuppressWarnings("rawtypes") FacetParser parent, String key) {
super(parent, key);
facet = new FacetQuery();
}
@Override
public FacetQuery parse(Object arg) throws SyntaxError {
parseCommonParams(arg);
String qstring = null;
if (arg instanceof String) {
// just the field name...
qstring = (String)arg;
} else if (arg instanceof Map) {
@SuppressWarnings({"unchecked"})
Map<String, Object> m = (Map<String, Object>) arg;
qstring = getString(m, "q", null);
if (qstring == null) {
qstring = getString(m, "query", null);
}
// OK to parse subs before we have parsed our own query?
// as long as subs don't need to know about it.
parseSubs( m.get("facet") );
} else if (arg != null) {
// something lke json.facet.facet.query=2
throw err("Expected string/map for facet query, received " + arg.getClass().getSimpleName() + "=" + arg);
}
// TODO: substats that are from defaults!!!
if (qstring != null) {
QParser parser = QParser.getParser(qstring, getSolrRequest());
parser.setIsFilter(true);
facet.q = parser.getQuery();
}
return facet;
}
}
/*** not a separate type of parser for now...
static class FacetBlockParentParser extends FacetParser<FacetBlockParent> {
public FacetBlockParentParser(FacetParser parent, String key) {
super(parent, key);
facet = new FacetBlockParent();
}
@Override
public FacetBlockParent parse(Object arg) throws SyntaxError {
parseCommonParams(arg);
if (arg instanceof String) {
// just the field name...
facet.parents = (String)arg;
} else if (arg instanceof Map) {
Map<String, Object> m = (Map<String, Object>) arg;
facet.parents = getString(m, "parents", null);
parseSubs( m.get("facet") );
}
return facet;
}
}
***/
static class FacetFieldParser extends FacetParser<FacetField> {
@SuppressWarnings({"rawtypes"})
public FacetFieldParser(FacetParser parent, String key) {
super(parent, key);
facet = new FacetField();
}
public FacetField parse(Object arg) throws SyntaxError {
parseCommonParams(arg);
if (arg instanceof String) {
// just the field name...
facet.field = (String)arg;
} else if (arg instanceof Map) {
@SuppressWarnings({"unchecked"})
Map<String, Object> m = (Map<String, Object>) arg;
facet.field = getField(m);
facet.offset = getLong(m, "offset", facet.offset);
facet.limit = getLong(m, "limit", facet.limit);
facet.overrequest = (int) getLong(m, "overrequest", facet.overrequest);
facet.overrefine = (int) getLong(m, "overrefine", facet.overrefine);
if (facet.limit == 0) facet.offset = 0; // normalize. an offset with a limit of non-zero isn't useful.
facet.mincount = getLong(m, "mincount", facet.mincount);
facet.missing = getBoolean(m, "missing", facet.missing);
facet.numBuckets = getBoolean(m, "numBuckets", facet.numBuckets);
facet.prefix = getString(m, "prefix", facet.prefix);
facet.allBuckets = getBoolean(m, "allBuckets", facet.allBuckets);
facet.method = FacetField.FacetMethod.fromString(getString(m, "method", null));
facet.cacheDf = (int)getLong(m, "cacheDf", facet.cacheDf);
// TODO: pull up to higher level?
facet.refine = RefineMethod.fromObj(m.get("refine"));
facet.perSeg = getBooleanOrNull(m, "perSeg");
// facet.sort may depend on a facet stat...
// should we be parsing / validating this here, or in the execution environment?
Object o = m.get("facet");
parseSubs(o);
facet.sort = parseAndValidateSort(facet, m, SORT);
facet.prelim_sort = parseAndValidateSort(facet, m, "prelim_sort");
} else if (arg != null) {
// something like json.facet.facet.field=2
throw err("Expected string/map for facet field, received " + arg.getClass().getSimpleName() + "=" + arg);
}
if (null == facet.sort) {
facet.sort = FacetSort.COUNT_DESC;
}
return facet;
}
/**
* Parses, validates and returns the {@link FacetSort} for given sortParam
* and facet field
* <p>
* Currently, supported sort specifications are 'mystat desc' OR {mystat: 'desc'}
* index - This is equivalent to 'index asc'
* count - This is equivalent to 'count desc'
* </p>
*
* @param facet {@link FacetField} for which sort needs to be parsed and validated
* @param args map containing the sortVal for given sortParam
* @param sortParam parameter for which sort needs to parsed and validated
* @return parsed facet sort
*/
private static FacetSort parseAndValidateSort(FacetField facet, Map<String, Object> args, String sortParam) {
Object sort = args.get(sortParam);
if (sort == null) {
return null;
}
FacetSort facetSort = null;
if (sort instanceof String) {
String sortStr = (String)sort;
if (sortStr.endsWith(" asc")) {
facetSort = new FacetSort(sortStr.substring(0, sortStr.length()-" asc".length()),
SortDirection.asc);
} else if (sortStr.endsWith(" desc")) {
facetSort = new FacetSort(sortStr.substring(0, sortStr.length()-" desc".length()),
SortDirection.desc);
} else {
facetSort = new FacetSort(sortStr,
// default direction for "index" is ascending
("index".equals(sortStr)
? SortDirection.asc
: SortDirection.desc));
}
} else if (sort instanceof Map) {
// { myvar : 'desc' }
@SuppressWarnings("unchecked")
Optional<Map.Entry<String,Object>> optional = ((Map<String,Object>)sort).entrySet().stream().findFirst();
if (optional.isPresent()) {
Map.Entry<String, Object> entry = optional.get();
facetSort = new FacetSort(entry.getKey(), SortDirection.fromObj(entry.getValue()));
}
} else {
throw new SolrException(SolrException.ErrorCode.BAD_REQUEST,
"Expected string/map for '" + sortParam +"', received "+ sort.getClass().getSimpleName() + "=" + sort);
}
Map<String, AggValueSource> facetStats = facet.facetStats;
// validate facet sort
boolean isValidSort = facetSort == null ||
"index".equals(facetSort.sortVariable) ||
"count".equals(facetSort.sortVariable) ||
(facetStats != null && facetStats.containsKey(facetSort.sortVariable));
if (!isValidSort) {
throw new SolrException(SolrException.ErrorCode.BAD_REQUEST,
"Invalid " + sortParam + " option '" + sort + "' for field '" + facet.field + "'");
}
return facetSort;
}
}
} }

View File

@ -53,7 +53,7 @@ abstract class GraphEdgeCollector extends SimpleCollector implements Collector {
// known leaf nodes // known leaf nodes
DocSet leafNodes; DocSet leafNodes;
int numHits=0; // number of documents visited int numHits = 0; // number of documents visited
BitSet bits; // if not null, used to collect documents visited BitSet bits; // if not null, used to collect documents visited
int base; int base;
@ -74,7 +74,9 @@ abstract class GraphEdgeCollector extends SimpleCollector implements Collector {
} }
// the number of docs visited // the number of docs visited
public int getNumHits() { return numHits; } public int getNumHits() {
return numHits;
}
public void collect(int segDoc) throws IOException { public void collect(int segDoc) throws IOException {
int doc = segDoc + base; int doc = segDoc + base;
@ -116,9 +118,8 @@ abstract class GraphEdgeCollector extends SimpleCollector implements Collector {
return ScoreMode.COMPLETE_NO_SCORES; return ScoreMode.COMPLETE_NO_SCORES;
} }
}
class GraphTermsCollector extends GraphEdgeCollector { static class GraphTermsCollector extends GraphEdgeCollector {
// all the collected terms // all the collected terms
private BytesRefHash collectorTerms; private BytesRefHash collectorTerms;
private SortedSetDocValues docTermOrds; private SortedSetDocValues docTermOrds;
@ -170,7 +171,7 @@ class GraphTermsCollector extends GraphEdgeCollector {
q = autnQuery; q = autnQuery;
} else { } else {
List<BytesRef> termList = new ArrayList<>(collectorTerms.size()); List<BytesRef> termList = new ArrayList<>(collectorTerms.size());
for (int i = 0 ; i < collectorTerms.size(); i++) { for (int i = 0; i < collectorTerms.size(); i++) {
BytesRef ref = new BytesRef(); BytesRef ref = new BytesRef();
collectorTerms.get(i, ref); collectorTerms.get(i, ref);
termList.add(ref); termList.add(ref);
@ -185,11 +186,13 @@ class GraphTermsCollector extends GraphEdgeCollector {
} }
/** Build an automaton to represent the frontier query */ /**
* Build an automaton to represent the frontier query
*/
private Automaton buildAutomaton(BytesRefHash termBytesHash) { private Automaton buildAutomaton(BytesRefHash termBytesHash) {
// need top pass a sorted set of terms to the autn builder (maybe a better way to avoid this?) // need top pass a sorted set of terms to the autn builder (maybe a better way to avoid this?)
final TreeSet<BytesRef> terms = new TreeSet<BytesRef>(); final TreeSet<BytesRef> terms = new TreeSet<BytesRef>();
for (int i = 0 ; i < termBytesHash.size(); i++) { for (int i = 0; i < termBytesHash.size(); i++) {
BytesRef ref = new BytesRef(); BytesRef ref = new BytesRef();
termBytesHash.get(i, ref); termBytesHash.get(i, ref);
terms.add(ref); terms.add(ref);
@ -197,6 +200,8 @@ class GraphTermsCollector extends GraphEdgeCollector {
final Automaton a = DaciukMihovAutomatonBuilder.build(terms); final Automaton a = DaciukMihovAutomatonBuilder.build(terms);
return a; return a;
} }
}
} }

View File

@ -200,7 +200,7 @@ public class GraphQuery extends Query {
// Create the graph result collector for this level // Create the graph result collector for this level
GraphEdgeCollector graphResultCollector = collectSchemaField.getType().isPointField() GraphEdgeCollector graphResultCollector = collectSchemaField.getType().isPointField()
? new GraphPointsCollector(collectSchemaField, new BitDocSet(resultBits), leafNodes) ? new GraphPointsCollector(collectSchemaField, new BitDocSet(resultBits), leafNodes)
: new GraphTermsCollector(collectSchemaField, new BitDocSet(resultBits), leafNodes); : new GraphEdgeCollector.GraphTermsCollector(collectSchemaField, new BitDocSet(resultBits), leafNodes);
fromSet = new BitDocSet(new FixedBitSet(capacity)); fromSet = new BitDocSet(new FixedBitSet(capacity));
graphResultCollector.setCollectDocs(fromSet.getBits()); graphResultCollector.setCollectDocs(fromSet.getBits());

View File

@ -69,7 +69,7 @@ public class TransactionLog implements Closeable {
private boolean debug = log.isDebugEnabled(); private boolean debug = log.isDebugEnabled();
private boolean trace = log.isTraceEnabled(); private boolean trace = log.isTraceEnabled();
public final static String END_MESSAGE="SOLR_TLOG_END"; public final static String END_MESSAGE = "SOLR_TLOG_END";
long id; long id;
File tlogFile; File tlogFile;
@ -83,7 +83,7 @@ public class TransactionLog implements Closeable {
protected volatile boolean deleteOnClose = true; // we can delete old tlogs since they are currently only used for real-time-get (and in the future, recovery) protected volatile boolean deleteOnClose = true; // we can delete old tlogs since they are currently only used for real-time-get (and in the future, recovery)
AtomicInteger refcount = new AtomicInteger(1); AtomicInteger refcount = new AtomicInteger(1);
Map<String,Integer> globalStringMap = new HashMap<>(); Map<String, Integer> globalStringMap = new HashMap<>();
List<String> globalStringList = new ArrayList<>(); List<String> globalStringList = new ArrayList<>();
// write a BytesRef as a byte array // write a BytesRef as a byte array
@ -91,7 +91,7 @@ public class TransactionLog implements Closeable {
@Override @Override
public Object resolve(Object o, JavaBinCodec codec) throws IOException { public Object resolve(Object o, JavaBinCodec codec) throws IOException {
if (o instanceof BytesRef) { if (o instanceof BytesRef) {
BytesRef br = (BytesRef)o; BytesRef br = (BytesRef) o;
codec.writeByteArray(br.bytes, br.offset, br.length); codec.writeByteArray(br.bytes, br.offset, br.length);
return null; return null;
} }
@ -172,7 +172,7 @@ public class TransactionLog implements Closeable {
// Parse tlog id from the filename // Parse tlog id from the filename
String filename = tlogFile.getName(); String filename = tlogFile.getName();
id = Long.parseLong(filename.substring(filename.lastIndexOf('.')+1)); id = Long.parseLong(filename.substring(filename.lastIndexOf('.') + 1));
this.tlogFile = tlogFile; this.tlogFile = tlogFile;
raf = new RandomAccessFile(this.tlogFile, "rw"); raf = new RandomAccessFile(this.tlogFile, "rw");
@ -222,7 +222,8 @@ public class TransactionLog implements Closeable {
} }
// for subclasses // for subclasses
protected TransactionLog() {} protected TransactionLog() {
}
/** Returns the number of records in the log (currently includes the header and an optional commit). /** Returns the number of records in the log (currently includes the header and an optional commit).
* Note: currently returns 0 for reopened existing log files. * Note: currently returns 0 for reopened existing log files.
@ -241,12 +242,12 @@ public class TransactionLog implements Closeable {
} }
// the end of the file should have the end message (added during a commit) plus a 4 byte size // the end of the file should have the end message (added during a commit) plus a 4 byte size
byte[] buf = new byte[ END_MESSAGE.length() ]; byte[] buf = new byte[END_MESSAGE.length()];
long pos = size - END_MESSAGE.length() - 4; long pos = size - END_MESSAGE.length() - 4;
if (pos < 0) return false; if (pos < 0) return false;
@SuppressWarnings("resource") final ChannelFastInputStream is = new ChannelFastInputStream(channel, pos); @SuppressWarnings("resource") final ChannelFastInputStream is = new ChannelFastInputStream(channel, pos);
is.read(buf); is.read(buf);
for (int i=0; i<buf.length; i++) { for (int i = 0; i < buf.length; i++) {
if (buf[i] != END_MESSAGE.charAt(i)) return false; if (buf[i] != END_MESSAGE.charAt(i)) return false;
} }
return true; return true;
@ -269,17 +270,17 @@ public class TransactionLog implements Closeable {
// read existing header // read existing header
fis = fis != null ? fis : new ChannelFastInputStream(channel, 0); fis = fis != null ? fis : new ChannelFastInputStream(channel, 0);
@SuppressWarnings("resource") final LogCodec codec = new LogCodec(resolver); @SuppressWarnings("resource") final LogCodec codec = new LogCodec(resolver);
Map header = (Map)codec.unmarshal(fis); Map header = (Map) codec.unmarshal(fis);
fis.readInt(); // skip size fis.readInt(); // skip size
// needed to read other records // needed to read other records
synchronized (this) { synchronized (this) {
globalStringList = (List<String>)header.get("strings"); globalStringList = (List<String>) header.get("strings");
globalStringMap = new HashMap<>(globalStringList.size()); globalStringMap = new HashMap<>(globalStringList.size());
for (int i=0; i<globalStringList.size(); i++) { for (int i = 0; i < globalStringList.size(); i++) {
globalStringMap.put( globalStringList.get(i), i+1); globalStringMap.put(globalStringList.get(i), i + 1);
} }
} }
} }
@ -309,16 +310,16 @@ public class TransactionLog implements Closeable {
long pos = fos.size(); long pos = fos.size();
assert pos == 0; assert pos == 0;
Map header = new LinkedHashMap<String,Object>(); Map header = new LinkedHashMap<String, Object>();
header.put("SOLR_TLOG",1); // a magic string + version number header.put("SOLR_TLOG", 1); // a magic string + version number
header.put("strings",globalStringList); header.put("strings", globalStringList);
codec.marshal(header, fos); codec.marshal(header, fos);
endRecord(pos); endRecord(pos);
} }
protected void endRecord(long startRecordPosition) throws IOException { protected void endRecord(long startRecordPosition) throws IOException {
fos.writeInt((int)(fos.size() - startRecordPosition)); fos.writeInt((int) (fos.size() - startRecordPosition));
numRecords++; numRecords++;
} }
@ -374,7 +375,7 @@ public class TransactionLog implements Closeable {
// adaptive buffer sizing // adaptive buffer sizing
int bufSize = lastAddSize; // unsynchronized access of lastAddSize should be fine int bufSize = lastAddSize; // unsynchronized access of lastAddSize should be fine
// at least 256 bytes and at most 1 MB // at least 256 bytes and at most 1 MB
bufSize = Math.min(1024*1024, Math.max(256, bufSize+(bufSize>>3)+256)); bufSize = Math.min(1024 * 1024, Math.max(256, bufSize + (bufSize >> 3) + 256));
MemOutputStream out = new MemOutputStream(new byte[bufSize]); MemOutputStream out = new MemOutputStream(new byte[bufSize]);
codec.init(out); codec.init(out);
@ -391,7 +392,7 @@ public class TransactionLog implements Closeable {
codec.writeLong(cmd.getVersion()); codec.writeLong(cmd.getVersion());
codec.writeSolrInputDocument(cmd.getSolrInputDocument()); codec.writeSolrInputDocument(cmd.getSolrInputDocument());
} }
lastAddSize = (int)out.size(); lastAddSize = (int) out.size();
synchronized (this) { synchronized (this) {
long pos = fos.size(); // if we had flushed, this should be equal to channel.position() long pos = fos.size(); // if we had flushed, this should be equal to channel.position()
@ -633,7 +634,8 @@ public class TransactionLog implements Closeable {
/** Returns a reader that can be used while a log is still in use. /** Returns a reader that can be used while a log is still in use.
* Currently only *one* LogReader may be outstanding, and that log may only * Currently only *one* LogReader may be outstanding, and that log may only
* be used from a single thread. */ * be used from a single thread.
*/
public LogReader getReader(long startingPos) { public LogReader getReader(long startingPos) {
return new LogReader(startingPos); return new LogReader(startingPos);
} }
@ -744,7 +746,7 @@ public class TransactionLog implements Closeable {
long pos = startingPos; long pos = startingPos;
long lastVersion = Long.MIN_VALUE; long lastVersion = Long.MIN_VALUE;
while ( (o = super.next()) != null) { while ((o = super.next()) != null) {
List entry = (List) o; List entry = (List) o;
long version = (Long) entry.get(UpdateLog.VERSION_IDX); long version = (Long) entry.get(UpdateLog.VERSION_IDX);
version = Math.abs(version); version = Math.abs(version);
@ -780,10 +782,11 @@ public class TransactionLog implements Closeable {
/* returns the position in the log file of the last record returned by next() */ /* returns the position in the log file of the last record returned by next() */
public abstract long position(); public abstract long position();
public abstract void close(); public abstract void close();
@Override @Override
public abstract String toString() ; public abstract String toString();
} }
@ -812,7 +815,7 @@ public class TransactionLog implements Closeable {
} }
fis = new ChannelFastInputStream(channel, 0); fis = new ChannelFastInputStream(channel, 0);
if (sz >=4) { if (sz >= 4) {
// readHeader(fis); // should not be needed // readHeader(fis); // should not be needed
prevPos = sz - 4; prevPos = sz - 4;
fis.seek(prevPos); fis.seek(prevPos);
@ -880,11 +883,7 @@ public class TransactionLog implements Closeable {
} }
} static class ChannelFastInputStream extends FastInputStream {
class ChannelFastInputStream extends FastInputStream {
private FileChannel ch; private FileChannel ch;
public ChannelFastInputStream(FileChannel ch, long chPosition) { public ChannelFastInputStream(FileChannel ch, long chPosition) {
@ -904,7 +903,7 @@ class ChannelFastInputStream extends FastInputStream {
public void seek(long position) throws IOException { public void seek(long position) throws IOException {
if (position <= readFromStream && position >= getBufferPos()) { if (position <= readFromStream && position >= getBufferPos()) {
// seek within buffer // seek within buffer
pos = (int)(position - getBufferPos()); pos = (int) (position - getBufferPos());
} else { } else {
// long currSize = ch.size(); // not needed - underlying read should handle (unless read never done) // long currSize = ch.size(); // not needed - underlying read should handle (unless read never done)
// if (position > currSize) throw new EOFException("Read past EOF: seeking to " + position + " on file of size " + currSize + " file=" + ch); // if (position > currSize) throw new EOFException("Read past EOF: seeking to " + position + " on file of size " + currSize + " file=" + ch);
@ -930,7 +929,8 @@ class ChannelFastInputStream extends FastInputStream {
@Override @Override
public String toString() { public String toString() {
return "readFromStream="+readFromStream +" pos="+pos +" end="+end + " bufferPos="+getBufferPos() + " position="+position() ; return "readFromStream=" + readFromStream + " pos=" + pos + " end=" + end + " bufferPos=" + getBufferPos() + " position=" + position();
}
} }
} }

View File

@ -131,7 +131,7 @@ public class DistributedZkUpdateProcessor extends DistributedUpdateProcessor {
while (nextInChain != null) { while (nextInChain != null) {
Class<? extends UpdateRequestProcessor> klass = nextInChain.getClass(); Class<? extends UpdateRequestProcessor> klass = nextInChain.getClass();
if (klass != LogUpdateProcessorFactory.LogUpdateProcessor.class if (klass != LogUpdateProcessorFactory.LogUpdateProcessor.class
&& klass != RunUpdateProcessor.class && klass != RunUpdateProcessorFactory.RunUpdateProcessor.class
&& klass != TolerantUpdateProcessor.class) { && klass != TolerantUpdateProcessor.class) {
shouldClone = true; shouldClone = true;
break; break;

View File

@ -33,14 +33,12 @@ import org.apache.solr.update.*;
* @since solr 1.3 * @since solr 1.3
* @see DistributingUpdateProcessorFactory * @see DistributingUpdateProcessorFactory
*/ */
public class RunUpdateProcessorFactory extends UpdateRequestProcessorFactory public class RunUpdateProcessorFactory extends UpdateRequestProcessorFactory {
{
public static final String PRE_RUN_CHAIN_NAME = "_preRun_"; public static final String PRE_RUN_CHAIN_NAME = "_preRun_";
@Override @Override
public UpdateRequestProcessor getInstance(SolrQueryRequest req, SolrQueryResponse rsp, UpdateRequestProcessor next) public UpdateRequestProcessor getInstance(SolrQueryRequest req, SolrQueryResponse rsp, UpdateRequestProcessor next) {
{
RunUpdateProcessor runUpdateProcessor = new RunUpdateProcessor(req, next); RunUpdateProcessor runUpdateProcessor = new RunUpdateProcessor(req, next);
UpdateRequestProcessorChain preRun = req.getCore().getUpdateProcessingChain(PRE_RUN_CHAIN_NAME); UpdateRequestProcessorChain preRun = req.getCore().getUpdateProcessingChain(PRE_RUN_CHAIN_NAME);
if (preRun != null) { if (preRun != null) {
@ -49,17 +47,16 @@ public class RunUpdateProcessorFactory extends UpdateRequestProcessorFactory
return runUpdateProcessor; return runUpdateProcessor;
} }
} }
}
class RunUpdateProcessor extends UpdateRequestProcessor
{ static class RunUpdateProcessor extends UpdateRequestProcessor {
private final SolrQueryRequest req; private final SolrQueryRequest req;
private final UpdateHandler updateHandler; private final UpdateHandler updateHandler;
private boolean changesSinceCommit = false; private boolean changesSinceCommit = false;
public RunUpdateProcessor(SolrQueryRequest req, UpdateRequestProcessor next) { public RunUpdateProcessor(SolrQueryRequest req, UpdateRequestProcessor next) {
super( next ); super(next);
this.req = req; this.req = req;
this.updateHandler = req.getCore().getUpdateHandler(); this.updateHandler = req.getCore().getUpdateHandler();
} }
@ -80,10 +77,9 @@ class RunUpdateProcessor extends UpdateRequestProcessor
@Override @Override
public void processDelete(DeleteUpdateCommand cmd) throws IOException { public void processDelete(DeleteUpdateCommand cmd) throws IOException {
if( cmd.isDeleteById()) { if (cmd.isDeleteById()) {
updateHandler.delete(cmd); updateHandler.delete(cmd);
} } else {
else {
updateHandler.deleteByQuery(cmd); updateHandler.deleteByQuery(cmd);
} }
super.processDelete(cmd); super.processDelete(cmd);
@ -97,8 +93,7 @@ class RunUpdateProcessor extends UpdateRequestProcessor
} }
@Override @Override
public void processCommit(CommitUpdateCommand cmd) throws IOException public void processCommit(CommitUpdateCommand cmd) throws IOException {
{
updateHandler.commit(cmd); updateHandler.commit(cmd);
super.processCommit(cmd); super.processCommit(cmd);
if (!cmd.softCommit) { if (!cmd.softCommit) {
@ -111,8 +106,7 @@ class RunUpdateProcessor extends UpdateRequestProcessor
* @since Solr 1.4 * @since Solr 1.4
*/ */
@Override @Override
public void processRollback(RollbackUpdateCommand cmd) throws IOException public void processRollback(RollbackUpdateCommand cmd) throws IOException {
{
updateHandler.rollback(cmd); updateHandler.rollback(cmd);
super.processRollback(cmd); super.processRollback(cmd);
changesSinceCommit = false; changesSinceCommit = false;
@ -126,6 +120,7 @@ class RunUpdateProcessor extends UpdateRequestProcessor
} }
super.finish(); super.finish();
} }
}
} }

View File

@ -136,7 +136,7 @@ public class TestJsonFacetRefinement extends SolrTestCaseHS {
try { try {
int nShards = responsesAndTests.length / 2; int nShards = responsesAndTests.length / 2;
Object jsonFacet = Utils.fromJSONString(facet); Object jsonFacet = Utils.fromJSONString(facet);
FacetParser parser = new FacetRequest.FacetTopParser(req); FacetParser parser = new FacetParser.FacetTopParser(req);
FacetRequest facetRequest = parser.parse(jsonFacet); FacetRequest facetRequest = parser.parse(jsonFacet);
FacetMerger merger = null; FacetMerger merger = null;

View File

@ -162,7 +162,7 @@ public class UpdateRequestProcessorFactoryTest extends SolrTestCaseJ4 {
// for these 3 (distrib) chains, the last proc should always be RunUpdateProcessor // for these 3 (distrib) chains, the last proc should always be RunUpdateProcessor
assertTrue(name + " (distrib) last processor isn't a RunUpdateProcessor: " + procs.toString(), assertTrue(name + " (distrib) last processor isn't a RunUpdateProcessor: " + procs.toString(),
procs.get(procs.size()-1) instanceof RunUpdateProcessor ); procs.get(procs.size()-1) instanceof RunUpdateProcessorFactory.RunUpdateProcessor );
// either 1 proc was droped in distrib mode, or 1 for the "implicit" chain // either 1 proc was droped in distrib mode, or 1 for the "implicit" chain

View File

@ -225,10 +225,9 @@ public class CharArr implements CharSequence, Appendable {
write(c); write(c);
return this; return this;
} }
}
class NullCharArr extends CharArr { static class NullCharArr extends CharArr {
public NullCharArr() { public NullCharArr() {
super(new char[1], 0, 0); super(new char[1], 0, 0);
} }
@ -274,11 +273,11 @@ class NullCharArr extends CharArr {
@Override @Override
public void write(String s, int stringOffset, int len) { public void write(String s, int stringOffset, int len) {
} }
} }
// IDEA: a subclass that refills the array from a reader? // IDEA: a subclass that refills the array from a reader?
class CharArrReader extends CharArr { class CharArrReader extends CharArr {
protected final Reader in; protected final Reader in;
public CharArrReader(Reader in, int size) { public CharArrReader(Reader in, int size) {
@ -325,10 +324,10 @@ class CharArrReader extends CharArr {
return sz; return sz;
} }
} }
class CharArrWriter extends CharArr { class CharArrWriter extends CharArr {
protected Writer sink; protected Writer sink;
@Override @Override
@ -391,4 +390,5 @@ class CharArrWriter extends CharArr {
} }
} }
}
} }

View File

@ -132,7 +132,7 @@ public class JSONParser {
return "Unknown: " + e; return "Unknown: " + e;
} }
private static final CharArr devNull = new NullCharArr(); private static final CharArr devNull = new CharArr.NullCharArr();
protected int flags = FLAGS_DEFAULT; protected int flags = FLAGS_DEFAULT;