diff --git a/solr/CHANGES.txt b/solr/CHANGES.txt index 0e09e496f97..d163d118726 100644 --- a/solr/CHANGES.txt +++ b/solr/CHANGES.txt @@ -263,6 +263,8 @@ Other Changes * SOLR-14280: Improve error reporting in SolrConfig (Andras Salamon via Jason Gerlowski) +* SOLR-14474: Fix remaining auxilliary class warnings in Solr (Erick Erickson) + ================== 8.5.1 ================== Consult the LUCENE_CHANGES.txt file for additional, low level, changes in this release. diff --git a/solr/core/src/java/org/apache/solr/cloud/ElectionContext.java b/solr/core/src/java/org/apache/solr/cloud/ElectionContext.java index 9ba49004539..13985709a7b 100644 --- a/solr/core/src/java/org/apache/solr/cloud/ElectionContext.java +++ b/solr/core/src/java/org/apache/solr/cloud/ElectionContext.java @@ -19,50 +19,13 @@ package org.apache.solr.cloud; import java.io.Closeable; import java.io.IOException; import java.lang.invoke.MethodHandles; -import java.util.ArrayList; -import java.util.EnumSet; -import java.util.List; -import java.util.concurrent.Future; -import java.util.concurrent.TimeUnit; - -import org.apache.hadoop.fs.Path; -import org.apache.lucene.search.MatchAllDocsQuery; -import org.apache.solr.cloud.overseer.OverseerAction; -import org.apache.solr.common.SolrException; -import org.apache.solr.common.SolrException.ErrorCode; -import org.apache.solr.common.cloud.ClusterState; -import org.apache.solr.common.cloud.DocCollection; -import org.apache.solr.common.cloud.Replica; -import org.apache.solr.common.cloud.Slice; import org.apache.solr.common.cloud.SolrZkClient; -import org.apache.solr.common.cloud.ZkCmdExecutor; -import org.apache.solr.common.cloud.ZkCoreNodeProps; import org.apache.solr.common.cloud.ZkNodeProps; -import org.apache.solr.common.cloud.ZkStateReader; -import org.apache.solr.common.util.RetryUtil; -import org.apache.solr.common.util.Utils; -import org.apache.solr.core.CoreContainer; -import org.apache.solr.core.SolrCore; -import org.apache.solr.logging.MDCLoggingContext; -import org.apache.solr.search.SolrIndexSearcher; -import org.apache.solr.update.PeerSync; -import org.apache.solr.update.UpdateLog; -import org.apache.solr.util.RefCounted; -import org.apache.zookeeper.CreateMode; import org.apache.zookeeper.KeeperException; import org.apache.zookeeper.KeeperException.NoNodeException; -import org.apache.zookeeper.KeeperException.NodeExistsException; -import org.apache.zookeeper.KeeperException.SessionExpiredException; -import org.apache.zookeeper.Op; -import org.apache.zookeeper.OpResult; -import org.apache.zookeeper.OpResult.SetDataResult; -import org.apache.zookeeper.ZooDefs; -import org.apache.zookeeper.data.Stat; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import static org.apache.solr.common.params.CommonParams.ID; - public abstract class ElectionContext implements Closeable { private static final Logger log = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass()); final String electionPath; @@ -111,676 +74,4 @@ public abstract class ElectionContext implements Closeable { } } -class ShardLeaderElectionContextBase extends ElectionContext { - private static final Logger log = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass()); - protected final SolrZkClient zkClient; - protected String shardId; - protected String collection; - protected LeaderElector leaderElector; - protected ZkStateReader zkStateReader; - protected ZkController zkController; - private Integer leaderZkNodeParentVersion; - // Prevents a race between cancelling and becoming leader. - private final Object lock = new Object(); - - public ShardLeaderElectionContextBase(LeaderElector leaderElector, - final String shardId, final String collection, final String coreNodeName, - ZkNodeProps props, ZkController zkController) { - super(coreNodeName, ZkStateReader.COLLECTIONS_ZKNODE + "/" + collection - + "/leader_elect/" + shardId, ZkStateReader.getShardLeadersPath( - collection, shardId), props, zkController.getZkClient()); - this.leaderElector = leaderElector; - this.zkStateReader = zkController.getZkStateReader(); - this.zkClient = zkStateReader.getZkClient(); - this.zkController = zkController; - this.shardId = shardId; - this.collection = collection; - - String parent = new Path(leaderPath).getParent().toString(); - ZkCmdExecutor zcmd = new ZkCmdExecutor(30000); - // only if /collections/{collection} exists already do we succeed in creating this path - log.info("make sure parent is created {}", parent); - try { - zcmd.ensureExists(parent, (byte[])null, CreateMode.PERSISTENT, zkClient, 2); - } catch (KeeperException e) { - throw new RuntimeException(e); - } catch (InterruptedException e) { - Thread.currentThread().interrupt(); - throw new RuntimeException(e); - } - } - - @Override - public void cancelElection() throws InterruptedException, KeeperException { - super.cancelElection(); - synchronized (lock) { - if (leaderZkNodeParentVersion != null) { - try { - // We need to be careful and make sure we *only* delete our own leader registration node. - // We do this by using a multi and ensuring the parent znode of the leader registration node - // matches the version we expect - there is a setData call that increments the parent's znode - // version whenever a leader registers. - log.debug("Removing leader registration node on cancel: {} {}", leaderPath, leaderZkNodeParentVersion); - List ops = new ArrayList<>(2); - ops.add(Op.check(new Path(leaderPath).getParent().toString(), leaderZkNodeParentVersion)); - ops.add(Op.delete(leaderPath, -1)); - zkClient.multi(ops, true); - } catch (KeeperException.NoNodeException nne) { - // no problem - log.debug("No leader registration node found to remove: {}", leaderPath); - } catch (KeeperException.BadVersionException bve) { - log.info("Cannot remove leader registration node because the current registered node is not ours: {}", leaderPath); - // no problem - } catch (InterruptedException e) { - throw e; - } catch (Exception e) { - SolrException.log(log, e); - } - leaderZkNodeParentVersion = null; - } else { - log.info("No version found for ephemeral leader parent node, won't remove previous leader registration."); - } - } - } - - @Override - void runLeaderProcess(boolean weAreReplacement, int pauseBeforeStartMs) - throws KeeperException, InterruptedException, IOException { - // register as leader - if an ephemeral is already there, wait to see if it goes away - - String parent = new Path(leaderPath).getParent().toString(); - try { - RetryUtil.retryOnThrowable(NodeExistsException.class, 60000, 5000, () -> { - synchronized (lock) { - log.info("Creating leader registration node {} after winning as {}", leaderPath, leaderSeqPath); - List ops = new ArrayList<>(2); - - // We use a multi operation to get the parent nodes version, which will - // be used to make sure we only remove our own leader registration node. - // The setData call used to get the parent version is also the trigger to - // increment the version. We also do a sanity check that our leaderSeqPath exists. - - ops.add(Op.check(leaderSeqPath, -1)); - ops.add(Op.create(leaderPath, Utils.toJSON(leaderProps), zkClient.getZkACLProvider().getACLsToAdd(leaderPath), CreateMode.EPHEMERAL)); - ops.add(Op.setData(parent, null, -1)); - List results; - - results = zkClient.multi(ops, true); - for (OpResult result : results) { - if (result.getType() == ZooDefs.OpCode.setData) { - SetDataResult dresult = (SetDataResult) result; - Stat stat = dresult.getStat(); - leaderZkNodeParentVersion = stat.getVersion(); - return; - } - } - assert leaderZkNodeParentVersion != null; - } - }); - } catch (NoNodeException e) { - log.info("Will not register as leader because it seems the election is no longer taking place."); - return; - } catch (Throwable t) { - if (t instanceof OutOfMemoryError) { - throw (OutOfMemoryError) t; - } - throw new SolrException(ErrorCode.SERVER_ERROR, "Could not register as the leader because creating the ephemeral registration node in ZooKeeper failed", t); - } - - assert shardId != null; - boolean isAlreadyLeader = false; - if (zkStateReader.getClusterState() != null && - zkStateReader.getClusterState().getCollection(collection).getSlice(shardId).getReplicas().size() < 2) { - Replica leader = zkStateReader.getLeader(collection, shardId); - if (leader != null - && leader.getBaseUrl().equals(leaderProps.get(ZkStateReader.BASE_URL_PROP)) - && leader.getCoreName().equals(leaderProps.get(ZkStateReader.CORE_NAME_PROP))) { - isAlreadyLeader = true; - } - } - if (!isAlreadyLeader) { - ZkNodeProps m = ZkNodeProps.fromKeyVals(Overseer.QUEUE_OPERATION, OverseerAction.LEADER.toLower(), - ZkStateReader.SHARD_ID_PROP, shardId, - ZkStateReader.COLLECTION_PROP, collection, - ZkStateReader.BASE_URL_PROP, leaderProps.get(ZkStateReader.BASE_URL_PROP), - ZkStateReader.CORE_NAME_PROP, leaderProps.get(ZkStateReader.CORE_NAME_PROP), - ZkStateReader.STATE_PROP, Replica.State.ACTIVE.toString()); - assert zkController != null; - assert zkController.getOverseer() != null; - zkController.getOverseer().offerStateUpdate(Utils.toJSON(m)); - } - } - - public LeaderElector getLeaderElector() { - return leaderElector; - } - - Integer getLeaderZkNodeParentVersion() { - synchronized (lock) { - return leaderZkNodeParentVersion; - } - } -} - -// add core container and stop passing core around... -final class ShardLeaderElectionContext extends ShardLeaderElectionContextBase { - private static final Logger log = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass()); - - private final CoreContainer cc; - private final SyncStrategy syncStrategy; - - private volatile boolean isClosed = false; - - public ShardLeaderElectionContext(LeaderElector leaderElector, - final String shardId, final String collection, - final String coreNodeName, ZkNodeProps props, ZkController zkController, CoreContainer cc) { - super(leaderElector, shardId, collection, coreNodeName, props, - zkController); - this.cc = cc; - syncStrategy = new SyncStrategy(cc); - } - - @Override - public void close() { - super.close(); - this.isClosed = true; - syncStrategy.close(); - } - - @Override - public void cancelElection() throws InterruptedException, KeeperException { - String coreName = leaderProps.getStr(ZkStateReader.CORE_NAME_PROP); - try (SolrCore core = cc.getCore(coreName)) { - if (core != null) { - core.getCoreDescriptor().getCloudDescriptor().setLeader(false); - } - } - - super.cancelElection(); - } - - @Override - public ElectionContext copy() { - return new ShardLeaderElectionContext(leaderElector, shardId, collection, id, leaderProps, zkController, cc); - } - - /* - * weAreReplacement: has someone else been the leader already? - */ - @Override - void runLeaderProcess(boolean weAreReplacement, int pauseBeforeStart) throws KeeperException, - InterruptedException, IOException { - String coreName = leaderProps.getStr(ZkStateReader.CORE_NAME_PROP); - ActionThrottle lt; - try (SolrCore core = cc.getCore(coreName)) { - if (core == null ) { - // shutdown or removed - return; - } - MDCLoggingContext.setCore(core); - lt = core.getUpdateHandler().getSolrCoreState().getLeaderThrottle(); - } - - try { - lt.minimumWaitBetweenActions(); - lt.markAttemptingAction(); - - - int leaderVoteWait = cc.getZkController().getLeaderVoteWait(); - - log.debug("Running the leader process for shard={} and weAreReplacement={} and leaderVoteWait={}", shardId, weAreReplacement, leaderVoteWait); - if (zkController.getClusterState().getCollection(collection).getSlice(shardId).getReplicas().size() > 1) { - // Clear the leader in clusterstate. We only need to worry about this if there is actually more than one replica. - ZkNodeProps m = new ZkNodeProps(Overseer.QUEUE_OPERATION, OverseerAction.LEADER.toLower(), - ZkStateReader.SHARD_ID_PROP, shardId, ZkStateReader.COLLECTION_PROP, collection); - zkController.getOverseer().getStateUpdateQueue().offer(Utils.toJSON(m)); - } - - boolean allReplicasInLine = false; - if (!weAreReplacement) { - allReplicasInLine = waitForReplicasToComeUp(leaderVoteWait); - } else { - allReplicasInLine = areAllReplicasParticipating(); - } - - if (isClosed) { - // Solr is shutting down or the ZooKeeper session expired while waiting for replicas. If the later, - // we cannot be sure we are still the leader, so we should bail out. The OnReconnect handler will - // re-register the cores and handle a new leadership election. - return; - } - - Replica.Type replicaType; - String coreNodeName; - boolean setTermToMax = false; - try (SolrCore core = cc.getCore(coreName)) { - - if (core == null) { - return; - } - - replicaType = core.getCoreDescriptor().getCloudDescriptor().getReplicaType(); - coreNodeName = core.getCoreDescriptor().getCloudDescriptor().getCoreNodeName(); - // should I be leader? - ZkShardTerms zkShardTerms = zkController.getShardTerms(collection, shardId); - if (zkShardTerms.registered(coreNodeName) && !zkShardTerms.canBecomeLeader(coreNodeName)) { - if (!waitForEligibleBecomeLeaderAfterTimeout(zkShardTerms, coreNodeName, leaderVoteWait)) { - rejoinLeaderElection(core); - return; - } else { - // only log an error if this replica win the election - setTermToMax = true; - } - } - - if (isClosed) { - return; - } - - log.info("I may be the new leader - try and sync"); - - // we are going to attempt to be the leader - // first cancel any current recovery - core.getUpdateHandler().getSolrCoreState().cancelRecovery(); - - if (weAreReplacement) { - // wait a moment for any floating updates to finish - try { - Thread.sleep(2500); - } catch (InterruptedException e) { - Thread.currentThread().interrupt(); - throw new SolrException(ErrorCode.SERVICE_UNAVAILABLE, e); - } - } - - PeerSync.PeerSyncResult result = null; - boolean success = false; - try { - result = syncStrategy.sync(zkController, core, leaderProps, weAreReplacement); - success = result.isSuccess(); - } catch (Exception e) { - SolrException.log(log, "Exception while trying to sync", e); - result = PeerSync.PeerSyncResult.failure(); - } - - UpdateLog ulog = core.getUpdateHandler().getUpdateLog(); - - if (!success) { - boolean hasRecentUpdates = false; - if (ulog != null) { - // TODO: we could optimize this if necessary - try (UpdateLog.RecentUpdates recentUpdates = ulog.getRecentUpdates()) { - hasRecentUpdates = !recentUpdates.getVersions(1).isEmpty(); - } - } - - if (!hasRecentUpdates) { - // we failed sync, but we have no versions - we can't sync in that case - // - we were active - // before, so become leader anyway if no one else has any versions either - if (result.getOtherHasVersions().orElse(false)) { - log.info("We failed sync, but we have no versions - we can't sync in that case. But others have some versions, so we should not become leader"); - success = false; - } else { - log.info( - "We failed sync, but we have no versions - we can't sync in that case - we were active before, so become leader anyway"); - success = true; - } - } - } - - // solrcloud_debug - if (log.isDebugEnabled()) { - try { - RefCounted searchHolder = core.getNewestSearcher(false); - SolrIndexSearcher searcher = searchHolder.get(); - try { - if (log.isDebugEnabled()) { - log.debug("{} synched {}", core.getCoreContainer().getZkController().getNodeName() - , searcher.count(new MatchAllDocsQuery())); - } - } finally { - searchHolder.decref(); - } - } catch (Exception e) { - log.error("Error in solrcloud_debug block", e); - } - } - if (!success) { - rejoinLeaderElection(core); - return; - } - - } - - boolean isLeader = true; - if (!isClosed) { - try { - if (replicaType == Replica.Type.TLOG) { - // stop replicate from old leader - zkController.stopReplicationFromLeader(coreName); - if (weAreReplacement) { - try (SolrCore core = cc.getCore(coreName)) { - Future future = core.getUpdateHandler().getUpdateLog().recoverFromCurrentLog(); - if (future != null) { - log.info("Replaying tlog before become new leader"); - future.get(); - } else { - log.info("New leader does not have old tlog to replay"); - } - } - } - } - // in case of leaderVoteWait timeout, a replica with lower term can win the election - if (setTermToMax) { - log.error("WARNING: Potential data loss -- Replica {} became leader after timeout (leaderVoteWait) {}" - , "without being up-to-date with the previous leader", coreNodeName); - zkController.getShardTerms(collection, shardId).setTermEqualsToLeader(coreNodeName); - } - super.runLeaderProcess(weAreReplacement, 0); - try (SolrCore core = cc.getCore(coreName)) { - if (core != null) { - core.getCoreDescriptor().getCloudDescriptor().setLeader(true); - publishActiveIfRegisteredAndNotActive(core); - } else { - return; - } - } - if (log.isInfoEnabled()) { - log.info("I am the new leader: {} {}", ZkCoreNodeProps.getCoreUrl(leaderProps), shardId); - } - - // we made it as leader - send any recovery requests we need to - syncStrategy.requestRecoveries(); - - } catch (SessionExpiredException e) { - throw new SolrException(ErrorCode.SERVER_ERROR, - "ZK session expired - cancelling election for " + collection + " " + shardId); - } catch (Exception e) { - isLeader = false; - SolrException.log(log, "There was a problem trying to register as the leader", e); - - try (SolrCore core = cc.getCore(coreName)) { - - if (core == null) { - if (log.isDebugEnabled()) { - log.debug("SolrCore not found: {} in {}", coreName, cc.getLoadedCoreNames()); - } - return; - } - - core.getCoreDescriptor().getCloudDescriptor().setLeader(false); - - // we could not publish ourselves as leader - try and rejoin election - try { - rejoinLeaderElection(core); - } catch (SessionExpiredException exc) { - throw new SolrException(ErrorCode.SERVER_ERROR, - "ZK session expired - cancelling election for " + collection + " " + shardId); - } - } - } - } else { - cancelElection(); - } - } finally { - MDCLoggingContext.clear(); - } - } - - /** - * Wait for other replicas with higher terms participate in the electioon - * @return true if after {@code timeout} there are no other replicas with higher term participate in the election, - * false if otherwise - */ - private boolean waitForEligibleBecomeLeaderAfterTimeout(ZkShardTerms zkShardTerms, String coreNodeName, int timeout) throws InterruptedException { - long timeoutAt = System.nanoTime() + TimeUnit.NANOSECONDS.convert(timeout, TimeUnit.MILLISECONDS); - while (!isClosed && !cc.isShutDown()) { - if (System.nanoTime() > timeoutAt) { - log.warn("After waiting for {}ms, no other potential leader was found, {} try to become leader anyway (core_term:{}, highest_term:{})", - timeout, coreNodeName, zkShardTerms.getTerm(coreNodeName), zkShardTerms.getHighestTerm()); - return true; - } - if (replicasWithHigherTermParticipated(zkShardTerms, coreNodeName)) { - log.info("Can't become leader, other replicas with higher term participated in leader election"); - return false; - } - Thread.sleep(500L); - } - return false; - } - - /** - * Do other replicas with higher term participated in the election - * @return true if other replicas with higher term participated in the election, false if otherwise - */ - private boolean replicasWithHigherTermParticipated(ZkShardTerms zkShardTerms, String coreNodeName) { - ClusterState clusterState = zkController.getClusterState(); - DocCollection docCollection = clusterState.getCollectionOrNull(collection); - Slice slices = (docCollection == null) ? null : docCollection.getSlice(shardId); - if (slices == null) return false; - - long replicaTerm = zkShardTerms.getTerm(coreNodeName); - boolean isRecovering = zkShardTerms.isRecovering(coreNodeName); - - for (Replica replica : slices.getReplicas()) { - if (replica.getName().equals(coreNodeName)) continue; - - if (clusterState.getLiveNodes().contains(replica.getNodeName())) { - long otherTerm = zkShardTerms.getTerm(replica.getName()); - boolean isOtherReplicaRecovering = zkShardTerms.isRecovering(replica.getName()); - - if (isRecovering && !isOtherReplicaRecovering) return true; - if (otherTerm > replicaTerm) return true; - } - } - return false; - } - - public void publishActiveIfRegisteredAndNotActive(SolrCore core) throws Exception { - if (core.getCoreDescriptor().getCloudDescriptor().hasRegistered()) { - ZkStateReader zkStateReader = zkController.getZkStateReader(); - zkStateReader.forceUpdateCollection(collection); - ClusterState clusterState = zkStateReader.getClusterState(); - Replica rep = getReplica(clusterState, collection, leaderProps.getStr(ZkStateReader.CORE_NODE_NAME_PROP)); - if (rep == null) return; - if (rep.getState() != Replica.State.ACTIVE || core.getCoreDescriptor().getCloudDescriptor().getLastPublished() != Replica.State.ACTIVE) { - log.debug("We have become the leader after core registration but are not in an ACTIVE state - publishing ACTIVE"); - zkController.publish(core.getCoreDescriptor(), Replica.State.ACTIVE); - } - } - } - - private Replica getReplica(ClusterState clusterState, String collectionName, String replicaName) { - if (clusterState == null) return null; - final DocCollection docCollection = clusterState.getCollectionOrNull(collectionName); - if (docCollection == null) return null; - return docCollection.getReplica(replicaName); - } - - // returns true if all replicas are found to be up, false if not - private boolean waitForReplicasToComeUp(int timeoutms) throws InterruptedException { - long timeoutAt = System.nanoTime() + TimeUnit.NANOSECONDS.convert(timeoutms, TimeUnit.MILLISECONDS); - final String shardsElectZkPath = electionPath + LeaderElector.ELECTION_NODE; - - DocCollection docCollection = zkController.getClusterState().getCollectionOrNull(collection); - Slice slices = (docCollection == null) ? null : docCollection.getSlice(shardId); - int cnt = 0; - while (!isClosed && !cc.isShutDown()) { - // wait for everyone to be up - if (slices != null) { - int found = 0; - try { - found = zkClient.getChildren(shardsElectZkPath, null, true).size(); - } catch (KeeperException e) { - if (e instanceof KeeperException.SessionExpiredException) { - // if the session has expired, then another election will be launched, so - // quit here - throw new SolrException(ErrorCode.SERVER_ERROR, - "ZK session expired - cancelling election for " + collection + " " + shardId); - } - SolrException.log(log, - "Error checking for the number of election participants", e); - } - - // on startup and after connection timeout, wait for all known shards - if (found >= slices.getReplicas(EnumSet.of(Replica.Type.TLOG, Replica.Type.NRT)).size()) { - log.info("Enough replicas found to continue."); - return true; - } else { - if (cnt % 40 == 0) { - if (log.isInfoEnabled()) { - log.info("Waiting until we see more replicas up for shard {}: total={} found={} timeoute in={}ms" - , shardId, slices.getReplicas(EnumSet.of(Replica.Type.TLOG, Replica.Type.NRT)).size(), found, - TimeUnit.MILLISECONDS.convert(timeoutAt - System.nanoTime(), TimeUnit.NANOSECONDS)); - } - } - } - - if (System.nanoTime() > timeoutAt) { - log.info("Was waiting for replicas to come up, but they are taking too long - assuming they won't come back till later"); - return false; - } - } else { - log.warn("Shard not found: {} for collection {}", shardId, collection); - - return false; - - } - - Thread.sleep(500); - docCollection = zkController.getClusterState().getCollectionOrNull(collection); - slices = (docCollection == null) ? null : docCollection.getSlice(shardId); - cnt++; - } - return false; - } - - // returns true if all replicas are found to be up, false if not - private boolean areAllReplicasParticipating() throws InterruptedException { - final String shardsElectZkPath = electionPath + LeaderElector.ELECTION_NODE; - final DocCollection docCollection = zkController.getClusterState().getCollectionOrNull(collection); - - if (docCollection != null && docCollection.getSlice(shardId) != null) { - final Slice slices = docCollection.getSlice(shardId); - int found = 0; - try { - found = zkClient.getChildren(shardsElectZkPath, null, true).size(); - } catch (KeeperException e) { - if (e instanceof KeeperException.SessionExpiredException) { - // if the session has expired, then another election will be launched, so - // quit here - throw new SolrException(ErrorCode.SERVER_ERROR, - "ZK session expired - cancelling election for " + collection + " " + shardId); - } - SolrException.log(log, "Error checking for the number of election participants", e); - } - - if (found >= slices.getReplicasMap().size()) { - log.debug("All replicas are ready to participate in election."); - return true; - } - } else { - log.warn("Shard not found: {} for collection {}", shardId, collection); - return false; - } - return false; - } - - private void rejoinLeaderElection(SolrCore core) - throws InterruptedException, KeeperException, IOException { - // remove our ephemeral and re join the election - if (cc.isShutDown()) { - log.debug("Not rejoining election because CoreContainer is closed"); - return; - } - - log.info("There may be a better leader candidate than us - going back into recovery"); - - cancelElection(); - - core.getUpdateHandler().getSolrCoreState().doRecovery(cc, core.getCoreDescriptor()); - - leaderElector.joinElection(this, true); - } - -} - -final class OverseerElectionContext extends ElectionContext { - private static final Logger log = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass()); - private final SolrZkClient zkClient; - private final Overseer overseer; - private volatile boolean isClosed = false; - - public OverseerElectionContext(SolrZkClient zkClient, Overseer overseer, final String zkNodeName) { - super(zkNodeName, Overseer.OVERSEER_ELECT, Overseer.OVERSEER_ELECT + "/leader", null, zkClient); - this.overseer = overseer; - this.zkClient = zkClient; - try { - new ZkCmdExecutor(zkClient.getZkClientTimeout()).ensureExists(Overseer.OVERSEER_ELECT, zkClient); - } catch (KeeperException e) { - throw new SolrException(ErrorCode.SERVER_ERROR, e); - } catch (InterruptedException e) { - Thread.currentThread().interrupt(); - throw new SolrException(ErrorCode.SERVER_ERROR, e); - } - } - - @Override - void runLeaderProcess(boolean weAreReplacement, int pauseBeforeStartMs) throws KeeperException, - InterruptedException { - if (isClosed) { - return; - } - log.info("I am going to be the leader {}", id); - final String id = leaderSeqPath - .substring(leaderSeqPath.lastIndexOf("/") + 1); - ZkNodeProps myProps = new ZkNodeProps(ID, id); - - zkClient.makePath(leaderPath, Utils.toJSON(myProps), - CreateMode.EPHEMERAL, true); - if(pauseBeforeStartMs >0){ - try { - Thread.sleep(pauseBeforeStartMs); - } catch (InterruptedException e) { - Thread.interrupted(); - log.warn("Wait interrupted ", e); - } - } - synchronized (this) { - if (!this.isClosed && !overseer.getZkController().getCoreContainer().isShutDown()) { - overseer.start(id); - } - } - } - - @Override - public void cancelElection() throws InterruptedException, KeeperException { - super.cancelElection(); - overseer.close(); - } - - @Override - public synchronized void close() { - this.isClosed = true; - overseer.close(); - } - - @Override - public ElectionContext copy() { - return new OverseerElectionContext(zkClient, overseer ,id); - } - - @Override - public void joinedElectionFired() { - overseer.close(); - } - - @Override - public void checkIfIamLeaderFired() { - // leader changed - close the overseer - overseer.close(); - } - -} diff --git a/solr/core/src/java/org/apache/solr/cloud/OverseerElectionContext.java b/solr/core/src/java/org/apache/solr/cloud/OverseerElectionContext.java new file mode 100644 index 00000000000..e25befa79cb --- /dev/null +++ b/solr/core/src/java/org/apache/solr/cloud/OverseerElectionContext.java @@ -0,0 +1,110 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.solr.cloud; + +import java.lang.invoke.MethodHandles; +import org.apache.solr.common.SolrException; +import org.apache.solr.common.SolrException.ErrorCode; +import org.apache.solr.common.cloud.SolrZkClient; +import org.apache.solr.common.cloud.ZkCmdExecutor; +import org.apache.solr.common.cloud.ZkNodeProps; +import org.apache.solr.common.util.Utils; +import org.apache.zookeeper.CreateMode; +import org.apache.zookeeper.KeeperException; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import static org.apache.solr.common.params.CommonParams.ID; + +final class OverseerElectionContext extends ElectionContext { + private static final Logger log = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass()); + private final SolrZkClient zkClient; + private final Overseer overseer; + private volatile boolean isClosed = false; + + public OverseerElectionContext(SolrZkClient zkClient, Overseer overseer, final String zkNodeName) { + super(zkNodeName, Overseer.OVERSEER_ELECT, Overseer.OVERSEER_ELECT + "/leader", null, zkClient); + this.overseer = overseer; + this.zkClient = zkClient; + try { + new ZkCmdExecutor(zkClient.getZkClientTimeout()).ensureExists(Overseer.OVERSEER_ELECT, zkClient); + } catch (KeeperException e) { + throw new SolrException(ErrorCode.SERVER_ERROR, e); + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + throw new SolrException(ErrorCode.SERVER_ERROR, e); + } + } + + @Override + void runLeaderProcess(boolean weAreReplacement, int pauseBeforeStartMs) throws KeeperException, + InterruptedException { + if (isClosed) { + return; + } + log.info("I am going to be the leader {}", id); + final String id = leaderSeqPath + .substring(leaderSeqPath.lastIndexOf("/") + 1); + ZkNodeProps myProps = new ZkNodeProps(ID, id); + + zkClient.makePath(leaderPath, Utils.toJSON(myProps), + CreateMode.EPHEMERAL, true); + if (pauseBeforeStartMs > 0) { + try { + Thread.sleep(pauseBeforeStartMs); + } catch (InterruptedException e) { + Thread.interrupted(); + log.warn("Wait interrupted ", e); + } + } + synchronized (this) { + if (!this.isClosed && !overseer.getZkController().getCoreContainer().isShutDown()) { + overseer.start(id); + } + } + } + + @Override + public void cancelElection() throws InterruptedException, KeeperException { + super.cancelElection(); + overseer.close(); + } + + @Override + public synchronized void close() { + this.isClosed = true; + overseer.close(); + } + + @Override + public ElectionContext copy() { + return new OverseerElectionContext(zkClient, overseer, id); + } + + @Override + public void joinedElectionFired() { + overseer.close(); + } + + @Override + public void checkIfIamLeaderFired() { + // leader changed - close the overseer + overseer.close(); + } + +} diff --git a/solr/core/src/java/org/apache/solr/cloud/ShardLeaderElectionContext.java b/solr/core/src/java/org/apache/solr/cloud/ShardLeaderElectionContext.java new file mode 100644 index 00000000000..f6c96caf205 --- /dev/null +++ b/solr/core/src/java/org/apache/solr/cloud/ShardLeaderElectionContext.java @@ -0,0 +1,493 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.solr.cloud; + +import java.io.IOException; +import java.lang.invoke.MethodHandles; +import java.util.EnumSet; +import java.util.concurrent.Future; +import java.util.concurrent.TimeUnit; + +import org.apache.lucene.search.MatchAllDocsQuery; +import org.apache.solr.cloud.overseer.OverseerAction; +import org.apache.solr.common.SolrException; +import org.apache.solr.common.SolrException.ErrorCode; +import org.apache.solr.common.cloud.ClusterState; +import org.apache.solr.common.cloud.DocCollection; +import org.apache.solr.common.cloud.Replica; +import org.apache.solr.common.cloud.Slice; +import org.apache.solr.common.cloud.ZkCoreNodeProps; +import org.apache.solr.common.cloud.ZkNodeProps; +import org.apache.solr.common.cloud.ZkStateReader; +import org.apache.solr.common.util.Utils; +import org.apache.solr.core.CoreContainer; +import org.apache.solr.core.SolrCore; +import org.apache.solr.logging.MDCLoggingContext; +import org.apache.solr.search.SolrIndexSearcher; +import org.apache.solr.update.PeerSync; +import org.apache.solr.update.UpdateLog; +import org.apache.solr.util.RefCounted; +import org.apache.zookeeper.KeeperException; +import org.apache.zookeeper.KeeperException.SessionExpiredException; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +// add core container and stop passing core around... +final class ShardLeaderElectionContext extends ShardLeaderElectionContextBase { + private static final Logger log = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass()); + + private final CoreContainer cc; + private final SyncStrategy syncStrategy; + + private volatile boolean isClosed = false; + + public ShardLeaderElectionContext(LeaderElector leaderElector, + final String shardId, final String collection, + final String coreNodeName, ZkNodeProps props, ZkController zkController, CoreContainer cc) { + super(leaderElector, shardId, collection, coreNodeName, props, + zkController); + this.cc = cc; + syncStrategy = new SyncStrategy(cc); + } + + @Override + public void close() { + super.close(); + this.isClosed = true; + syncStrategy.close(); + } + + @Override + public void cancelElection() throws InterruptedException, KeeperException { + String coreName = leaderProps.getStr(ZkStateReader.CORE_NAME_PROP); + try (SolrCore core = cc.getCore(coreName)) { + if (core != null) { + core.getCoreDescriptor().getCloudDescriptor().setLeader(false); + } + } + + super.cancelElection(); + } + + @Override + public ElectionContext copy() { + return new ShardLeaderElectionContext(leaderElector, shardId, collection, id, leaderProps, zkController, cc); + } + + /* + * weAreReplacement: has someone else been the leader already? + */ + @Override + void runLeaderProcess(boolean weAreReplacement, int pauseBeforeStart) throws KeeperException, + InterruptedException, IOException { + String coreName = leaderProps.getStr(ZkStateReader.CORE_NAME_PROP); + ActionThrottle lt; + try (SolrCore core = cc.getCore(coreName)) { + if (core == null) { + // shutdown or removed + return; + } + MDCLoggingContext.setCore(core); + lt = core.getUpdateHandler().getSolrCoreState().getLeaderThrottle(); + } + + try { + lt.minimumWaitBetweenActions(); + lt.markAttemptingAction(); + + + int leaderVoteWait = cc.getZkController().getLeaderVoteWait(); + + log.debug("Running the leader process for shard={} and weAreReplacement={} and leaderVoteWait={}", shardId, weAreReplacement, leaderVoteWait); + if (zkController.getClusterState().getCollection(collection).getSlice(shardId).getReplicas().size() > 1) { + // Clear the leader in clusterstate. We only need to worry about this if there is actually more than one replica. + ZkNodeProps m = new ZkNodeProps(Overseer.QUEUE_OPERATION, OverseerAction.LEADER.toLower(), + ZkStateReader.SHARD_ID_PROP, shardId, ZkStateReader.COLLECTION_PROP, collection); + zkController.getOverseer().getStateUpdateQueue().offer(Utils.toJSON(m)); + } + + boolean allReplicasInLine = false; + if (!weAreReplacement) { + allReplicasInLine = waitForReplicasToComeUp(leaderVoteWait); + } else { + allReplicasInLine = areAllReplicasParticipating(); + } + + if (isClosed) { + // Solr is shutting down or the ZooKeeper session expired while waiting for replicas. If the later, + // we cannot be sure we are still the leader, so we should bail out. The OnReconnect handler will + // re-register the cores and handle a new leadership election. + return; + } + + Replica.Type replicaType; + String coreNodeName; + boolean setTermToMax = false; + try (SolrCore core = cc.getCore(coreName)) { + + if (core == null) { + return; + } + + replicaType = core.getCoreDescriptor().getCloudDescriptor().getReplicaType(); + coreNodeName = core.getCoreDescriptor().getCloudDescriptor().getCoreNodeName(); + // should I be leader? + ZkShardTerms zkShardTerms = zkController.getShardTerms(collection, shardId); + if (zkShardTerms.registered(coreNodeName) && !zkShardTerms.canBecomeLeader(coreNodeName)) { + if (!waitForEligibleBecomeLeaderAfterTimeout(zkShardTerms, coreNodeName, leaderVoteWait)) { + rejoinLeaderElection(core); + return; + } else { + // only log an error if this replica win the election + setTermToMax = true; + } + } + + if (isClosed) { + return; + } + + log.info("I may be the new leader - try and sync"); + + // we are going to attempt to be the leader + // first cancel any current recovery + core.getUpdateHandler().getSolrCoreState().cancelRecovery(); + + if (weAreReplacement) { + // wait a moment for any floating updates to finish + try { + Thread.sleep(2500); + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + throw new SolrException(SolrException.ErrorCode.SERVICE_UNAVAILABLE, e); + } + } + + PeerSync.PeerSyncResult result = null; + boolean success = false; + try { + result = syncStrategy.sync(zkController, core, leaderProps, weAreReplacement); + success = result.isSuccess(); + } catch (Exception e) { + SolrException.log(log, "Exception while trying to sync", e); + result = PeerSync.PeerSyncResult.failure(); + } + + UpdateLog ulog = core.getUpdateHandler().getUpdateLog(); + + if (!success) { + boolean hasRecentUpdates = false; + if (ulog != null) { + // TODO: we could optimize this if necessary + try (UpdateLog.RecentUpdates recentUpdates = ulog.getRecentUpdates()) { + hasRecentUpdates = !recentUpdates.getVersions(1).isEmpty(); + } + } + + if (!hasRecentUpdates) { + // we failed sync, but we have no versions - we can't sync in that case + // - we were active + // before, so become leader anyway if no one else has any versions either + if (result.getOtherHasVersions().orElse(false)) { + log.info("We failed sync, but we have no versions - we can't sync in that case. But others have some versions, so we should not become leader"); + success = false; + } else { + log.info( + "We failed sync, but we have no versions - we can't sync in that case - we were active before, so become leader anyway"); + success = true; + } + } + } + + // solrcloud_debug + if (log.isDebugEnabled()) { + try { + RefCounted searchHolder = core.getNewestSearcher(false); + SolrIndexSearcher searcher = searchHolder.get(); + try { + if (log.isDebugEnabled()) { + log.debug("{} synched {}", core.getCoreContainer().getZkController().getNodeName() + , searcher.count(new MatchAllDocsQuery())); + } + } finally { + searchHolder.decref(); + } + } catch (Exception e) { + log.error("Error in solrcloud_debug block", e); + } + } + if (!success) { + rejoinLeaderElection(core); + return; + } + + } + + boolean isLeader = true; + if (!isClosed) { + try { + if (replicaType == Replica.Type.TLOG) { + // stop replicate from old leader + zkController.stopReplicationFromLeader(coreName); + if (weAreReplacement) { + try (SolrCore core = cc.getCore(coreName)) { + Future future = core.getUpdateHandler().getUpdateLog().recoverFromCurrentLog(); + if (future != null) { + log.info("Replaying tlog before become new leader"); + future.get(); + } else { + log.info("New leader does not have old tlog to replay"); + } + } + } + } + // in case of leaderVoteWait timeout, a replica with lower term can win the election + if (setTermToMax) { + log.error("WARNING: Potential data loss -- Replica {} became leader after timeout (leaderVoteWait) {}" + , "without being up-to-date with the previous leader", coreNodeName); + zkController.getShardTerms(collection, shardId).setTermEqualsToLeader(coreNodeName); + } + super.runLeaderProcess(weAreReplacement, 0); + try (SolrCore core = cc.getCore(coreName)) { + if (core != null) { + core.getCoreDescriptor().getCloudDescriptor().setLeader(true); + publishActiveIfRegisteredAndNotActive(core); + } else { + return; + } + } + if (log.isInfoEnabled()) { + log.info("I am the new leader: {} {}", ZkCoreNodeProps.getCoreUrl(leaderProps), shardId); + } + + // we made it as leader - send any recovery requests we need to + syncStrategy.requestRecoveries(); + + } catch (SessionExpiredException e) { + throw new SolrException(ErrorCode.SERVER_ERROR, + "ZK session expired - cancelling election for " + collection + " " + shardId); + } catch (Exception e) { + isLeader = false; + SolrException.log(log, "There was a problem trying to register as the leader", e); + + try (SolrCore core = cc.getCore(coreName)) { + + if (core == null) { + if (log.isDebugEnabled()) { + log.debug("SolrCore not found: {} in {}", coreName, cc.getLoadedCoreNames()); + } + return; + } + + core.getCoreDescriptor().getCloudDescriptor().setLeader(false); + + // we could not publish ourselves as leader - try and rejoin election + try { + rejoinLeaderElection(core); + } catch (SessionExpiredException exc) { + throw new SolrException(ErrorCode.SERVER_ERROR, + "ZK session expired - cancelling election for " + collection + " " + shardId); + } + } + } + } else { + cancelElection(); + } + } finally { + MDCLoggingContext.clear(); + } + } + + /** + * Wait for other replicas with higher terms participate in the electioon + * + * @return true if after {@code timeout} there are no other replicas with higher term participate in the election, + * false if otherwise + */ + private boolean waitForEligibleBecomeLeaderAfterTimeout(ZkShardTerms zkShardTerms, String coreNodeName, int timeout) throws InterruptedException { + long timeoutAt = System.nanoTime() + TimeUnit.NANOSECONDS.convert(timeout, TimeUnit.MILLISECONDS); + while (!isClosed && !cc.isShutDown()) { + if (System.nanoTime() > timeoutAt) { + log.warn("After waiting for {}ms, no other potential leader was found, {} try to become leader anyway (core_term:{}, highest_term:{})", + timeout, coreNodeName, zkShardTerms.getTerm(coreNodeName), zkShardTerms.getHighestTerm()); + return true; + } + if (replicasWithHigherTermParticipated(zkShardTerms, coreNodeName)) { + log.info("Can't become leader, other replicas with higher term participated in leader election"); + return false; + } + Thread.sleep(500L); + } + return false; + } + + /** + * Do other replicas with higher term participated in the election + * + * @return true if other replicas with higher term participated in the election, false if otherwise + */ + private boolean replicasWithHigherTermParticipated(ZkShardTerms zkShardTerms, String coreNodeName) { + ClusterState clusterState = zkController.getClusterState(); + DocCollection docCollection = clusterState.getCollectionOrNull(collection); + Slice slices = (docCollection == null) ? null : docCollection.getSlice(shardId); + if (slices == null) return false; + + long replicaTerm = zkShardTerms.getTerm(coreNodeName); + boolean isRecovering = zkShardTerms.isRecovering(coreNodeName); + + for (Replica replica : slices.getReplicas()) { + if (replica.getName().equals(coreNodeName)) continue; + + if (clusterState.getLiveNodes().contains(replica.getNodeName())) { + long otherTerm = zkShardTerms.getTerm(replica.getName()); + boolean isOtherReplicaRecovering = zkShardTerms.isRecovering(replica.getName()); + + if (isRecovering && !isOtherReplicaRecovering) return true; + if (otherTerm > replicaTerm) return true; + } + } + return false; + } + + public void publishActiveIfRegisteredAndNotActive(SolrCore core) throws Exception { + if (core.getCoreDescriptor().getCloudDescriptor().hasRegistered()) { + ZkStateReader zkStateReader = zkController.getZkStateReader(); + zkStateReader.forceUpdateCollection(collection); + ClusterState clusterState = zkStateReader.getClusterState(); + Replica rep = getReplica(clusterState, collection, leaderProps.getStr(ZkStateReader.CORE_NODE_NAME_PROP)); + if (rep == null) return; + if (rep.getState() != Replica.State.ACTIVE || core.getCoreDescriptor().getCloudDescriptor().getLastPublished() != Replica.State.ACTIVE) { + log.debug("We have become the leader after core registration but are not in an ACTIVE state - publishing ACTIVE"); + zkController.publish(core.getCoreDescriptor(), Replica.State.ACTIVE); + } + } + } + + private Replica getReplica(ClusterState clusterState, String collectionName, String replicaName) { + if (clusterState == null) return null; + final DocCollection docCollection = clusterState.getCollectionOrNull(collectionName); + if (docCollection == null) return null; + return docCollection.getReplica(replicaName); + } + + // returns true if all replicas are found to be up, false if not + private boolean waitForReplicasToComeUp(int timeoutms) throws InterruptedException { + long timeoutAt = System.nanoTime() + TimeUnit.NANOSECONDS.convert(timeoutms, TimeUnit.MILLISECONDS); + final String shardsElectZkPath = electionPath + LeaderElector.ELECTION_NODE; + + DocCollection docCollection = zkController.getClusterState().getCollectionOrNull(collection); + Slice slices = (docCollection == null) ? null : docCollection.getSlice(shardId); + int cnt = 0; + while (!isClosed && !cc.isShutDown()) { + // wait for everyone to be up + if (slices != null) { + int found = 0; + try { + found = zkClient.getChildren(shardsElectZkPath, null, true).size(); + } catch (KeeperException e) { + if (e instanceof KeeperException.SessionExpiredException) { + // if the session has expired, then another election will be launched, so + // quit here + throw new SolrException(ErrorCode.SERVER_ERROR, + "ZK session expired - cancelling election for " + collection + " " + shardId); + } + SolrException.log(log, + "Error checking for the number of election participants", e); + } + + // on startup and after connection timeout, wait for all known shards + if (found >= slices.getReplicas(EnumSet.of(Replica.Type.TLOG, Replica.Type.NRT)).size()) { + log.info("Enough replicas found to continue."); + return true; + } else { + if (cnt % 40 == 0) { + if (log.isInfoEnabled()) { + log.info("Waiting until we see more replicas up for shard {}: total={} found={} timeoute in={}ms" + , shardId, slices.getReplicas(EnumSet.of(Replica.Type.TLOG, Replica.Type.NRT)).size(), found, + TimeUnit.MILLISECONDS.convert(timeoutAt - System.nanoTime(), TimeUnit.NANOSECONDS)); + } + } + } + + if (System.nanoTime() > timeoutAt) { + log.info("Was waiting for replicas to come up, but they are taking too long - assuming they won't come back till later"); + return false; + } + } else { + log.warn("Shard not found: {} for collection {}", shardId, collection); + + return false; + + } + + Thread.sleep(500); + docCollection = zkController.getClusterState().getCollectionOrNull(collection); + slices = (docCollection == null) ? null : docCollection.getSlice(shardId); + cnt++; + } + return false; + } + + // returns true if all replicas are found to be up, false if not + private boolean areAllReplicasParticipating() throws InterruptedException { + final String shardsElectZkPath = electionPath + LeaderElector.ELECTION_NODE; + final DocCollection docCollection = zkController.getClusterState().getCollectionOrNull(collection); + + if (docCollection != null && docCollection.getSlice(shardId) != null) { + final Slice slices = docCollection.getSlice(shardId); + int found = 0; + try { + found = zkClient.getChildren(shardsElectZkPath, null, true).size(); + } catch (KeeperException e) { + if (e instanceof KeeperException.SessionExpiredException) { + // if the session has expired, then another election will be launched, so + // quit here + throw new SolrException(ErrorCode.SERVER_ERROR, + "ZK session expired - cancelling election for " + collection + " " + shardId); + } + SolrException.log(log, "Error checking for the number of election participants", e); + } + + if (found >= slices.getReplicasMap().size()) { + log.debug("All replicas are ready to participate in election."); + return true; + } + } else { + log.warn("Shard not found: {} for collection {}", shardId, collection); + return false; + } + return false; + } + + private void rejoinLeaderElection(SolrCore core) + throws InterruptedException, KeeperException, IOException { + // remove our ephemeral and re join the election + if (cc.isShutDown()) { + log.debug("Not rejoining election because CoreContainer is closed"); + return; + } + + log.info("There may be a better leader candidate than us - going back into recovery"); + + cancelElection(); + + core.getUpdateHandler().getSolrCoreState().doRecovery(cc, core.getCoreDescriptor()); + + leaderElector.joinElection(this, true); + } + +} diff --git a/solr/core/src/java/org/apache/solr/cloud/ShardLeaderElectionContextBase.java b/solr/core/src/java/org/apache/solr/cloud/ShardLeaderElectionContextBase.java new file mode 100644 index 00000000000..a9afc8df34e --- /dev/null +++ b/solr/core/src/java/org/apache/solr/cloud/ShardLeaderElectionContextBase.java @@ -0,0 +1,194 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.solr.cloud; + +import java.io.IOException; +import java.lang.invoke.MethodHandles; +import java.util.List; +import java.util.ArrayList; + +import org.apache.hadoop.fs.Path; +import org.apache.solr.cloud.overseer.OverseerAction; +import org.apache.solr.common.SolrException; +import org.apache.solr.common.SolrException.ErrorCode; +import org.apache.solr.common.cloud.Replica; +import org.apache.solr.common.cloud.SolrZkClient; +import org.apache.solr.common.cloud.ZkCmdExecutor; +import org.apache.solr.common.cloud.ZkNodeProps; +import org.apache.solr.common.cloud.ZkStateReader; +import org.apache.solr.common.util.RetryUtil; +import org.apache.solr.common.util.Utils; +import org.apache.zookeeper.CreateMode; +import org.apache.zookeeper.KeeperException; +import org.apache.zookeeper.KeeperException.NoNodeException; +import org.apache.zookeeper.KeeperException.NodeExistsException; +import org.apache.zookeeper.Op; +import org.apache.zookeeper.OpResult; +import org.apache.zookeeper.OpResult.SetDataResult; +import org.apache.zookeeper.ZooDefs; +import org.apache.zookeeper.data.Stat; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +class ShardLeaderElectionContextBase extends ElectionContext { + private static final Logger log = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass()); + protected final SolrZkClient zkClient; + protected String shardId; + protected String collection; + protected LeaderElector leaderElector; + protected ZkStateReader zkStateReader; + protected ZkController zkController; + private Integer leaderZkNodeParentVersion; + + // Prevents a race between cancelling and becoming leader. + private final Object lock = new Object(); + + public ShardLeaderElectionContextBase(LeaderElector leaderElector, + final String shardId, final String collection, final String coreNodeName, + ZkNodeProps props, ZkController zkController) { + super(coreNodeName, ZkStateReader.COLLECTIONS_ZKNODE + "/" + collection + + "/leader_elect/" + shardId, ZkStateReader.getShardLeadersPath( + collection, shardId), props, zkController.getZkClient()); + this.leaderElector = leaderElector; + this.zkStateReader = zkController.getZkStateReader(); + this.zkClient = zkStateReader.getZkClient(); + this.zkController = zkController; + this.shardId = shardId; + this.collection = collection; + + String parent = new Path(leaderPath).getParent().toString(); + ZkCmdExecutor zcmd = new ZkCmdExecutor(30000); + // only if /collections/{collection} exists already do we succeed in creating this path + log.info("make sure parent is created {}", parent); + try { + zcmd.ensureExists(parent, (byte[]) null, CreateMode.PERSISTENT, zkClient, 2); + } catch (KeeperException e) { + throw new RuntimeException(e); + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + throw new RuntimeException(e); + } + } + + @Override + public void cancelElection() throws InterruptedException, KeeperException { + super.cancelElection(); + synchronized (lock) { + if (leaderZkNodeParentVersion != null) { + // no problem + // no problem + try { + // We need to be careful and make sure we *only* delete our own leader registration node. + // We do this by using a multi and ensuring the parent znode of the leader registration node + // matches the version we expect - there is a setData call that increments the parent's znode + // version whenever a leader registers. + log.debug("Removing leader registration node on cancel: {} {}", leaderPath, leaderZkNodeParentVersion); + List ops = new ArrayList<>(2); + ops.add(Op.check(new Path(leaderPath).getParent().toString(), leaderZkNodeParentVersion)); + ops.add(Op.delete(leaderPath, -1)); + zkClient.multi(ops, true); + } catch (InterruptedException e) { + throw e; + } catch (IllegalArgumentException e) { + SolrException.log(log, e); + } + leaderZkNodeParentVersion = null; + } else { + log.info("No version found for ephemeral leader parent node, won't remove previous leader registration."); + } + } + } + + @Override + void runLeaderProcess(boolean weAreReplacement, int pauseBeforeStartMs) + throws KeeperException, InterruptedException, IOException { + // register as leader - if an ephemeral is already there, wait to see if it goes away + + String parent = new Path(leaderPath).getParent().toString(); + try { + RetryUtil.retryOnThrowable(NodeExistsException.class, 60000, 5000, () -> { + synchronized (lock) { + log.info("Creating leader registration node {} after winning as {}", leaderPath, leaderSeqPath); + List ops = new ArrayList<>(2); + + // We use a multi operation to get the parent nodes version, which will + // be used to make sure we only remove our own leader registration node. + // The setData call used to get the parent version is also the trigger to + // increment the version. We also do a sanity check that our leaderSeqPath exists. + + ops.add(Op.check(leaderSeqPath, -1)); + ops.add(Op.create(leaderPath, Utils.toJSON(leaderProps), zkClient.getZkACLProvider().getACLsToAdd(leaderPath), CreateMode.EPHEMERAL)); + ops.add(Op.setData(parent, null, -1)); + List results; + + results = zkClient.multi(ops, true); + for (OpResult result : results) { + if (result.getType() == ZooDefs.OpCode.setData) { + SetDataResult dresult = (SetDataResult) result; + Stat stat = dresult.getStat(); + leaderZkNodeParentVersion = stat.getVersion(); + return; + } + } + assert leaderZkNodeParentVersion != null; + } + }); + } catch (NoNodeException e) { + log.info("Will not register as leader because it seems the election is no longer taking place."); + return; + } catch (Throwable t) { + if (t instanceof OutOfMemoryError) { + throw (OutOfMemoryError) t; + } + throw new SolrException(ErrorCode.SERVER_ERROR, "Could not register as the leader because creating the ephemeral registration node in ZooKeeper failed", t); + } + + assert shardId != null; + boolean isAlreadyLeader = false; + if (zkStateReader.getClusterState() != null && + zkStateReader.getClusterState().getCollection(collection).getSlice(shardId).getReplicas().size() < 2) { + Replica leader = zkStateReader.getLeader(collection, shardId); + if (leader != null + && leader.getBaseUrl().equals(leaderProps.get(ZkStateReader.BASE_URL_PROP)) + && leader.getCoreName().equals(leaderProps.get(ZkStateReader.CORE_NAME_PROP))) { + isAlreadyLeader = true; + } + } + if (!isAlreadyLeader) { + ZkNodeProps m = ZkNodeProps.fromKeyVals(Overseer.QUEUE_OPERATION, OverseerAction.LEADER.toLower(), + ZkStateReader.SHARD_ID_PROP, shardId, + ZkStateReader.COLLECTION_PROP, collection, + ZkStateReader.BASE_URL_PROP, leaderProps.get(ZkStateReader.BASE_URL_PROP), + ZkStateReader.CORE_NAME_PROP, leaderProps.get(ZkStateReader.CORE_NAME_PROP), + ZkStateReader.STATE_PROP, Replica.State.ACTIVE.toString()); + assert zkController != null; + assert zkController.getOverseer() != null; + zkController.getOverseer().offerStateUpdate(Utils.toJSON(m)); + } + } + + public LeaderElector getLeaderElector() { + return leaderElector; + } + + Integer getLeaderZkNodeParentVersion() { + synchronized (lock) { + return leaderZkNodeParentVersion; + } + } +} \ No newline at end of file diff --git a/solr/core/src/java/org/apache/solr/handler/component/PivotFacetProcessor.java b/solr/core/src/java/org/apache/solr/handler/component/PivotFacetProcessor.java index 011d6623837..1069c50d12c 100644 --- a/solr/core/src/java/org/apache/solr/handler/component/PivotFacetProcessor.java +++ b/solr/core/src/java/org/apache/solr/handler/component/PivotFacetProcessor.java @@ -74,7 +74,7 @@ public class PivotFacetProcessor extends SimpleFacets // rb._statsInfo may be null if stats=false, ie: refine requests // if that's the case, but we need to refine w/stats, then we'll lazy init our // own instance of StatsInfo - StatsInfo statsInfo = rb._statsInfo; + StatsInfo statsInfo = rb._statsInfo; SimpleOrderedMap>> pivotResponse = new SimpleOrderedMap<>(); for (String pivotList : pivots) { @@ -237,7 +237,7 @@ public class PivotFacetProcessor extends SimpleFacets * * @return A list of StatsFields to compute for this pivot, or the empty list if none */ - private static List getTaggedStatsFields(StatsInfo statsInfo, + private static List getTaggedStatsFields(StatsInfo statsInfo, String statsLocalParam) { if (null == statsLocalParam || null == statsInfo) { return Collections.emptyList(); diff --git a/solr/core/src/java/org/apache/solr/handler/component/StatsComponent.java b/solr/core/src/java/org/apache/solr/handler/component/StatsComponent.java index fc5c29f829a..4b80dae610d 100644 --- a/solr/core/src/java/org/apache/solr/handler/component/StatsComponent.java +++ b/solr/core/src/java/org/apache/solr/handler/component/StatsComponent.java @@ -17,15 +17,11 @@ package org.apache.solr.handler.component; import java.io.IOException; -import java.util.ArrayList; -import java.util.Collections; import java.util.LinkedHashMap; -import java.util.List; import java.util.Map; import org.apache.solr.common.SolrException; import org.apache.solr.common.params.ShardParams; -import org.apache.solr.common.params.SolrParams; import org.apache.solr.common.params.StatsParams; import org.apache.solr.common.util.NamedList; import org.apache.solr.common.util.SimpleOrderedMap; @@ -41,13 +37,13 @@ public class StatsComponent extends SearchComponent { @Override public void prepare(ResponseBuilder rb) throws IOException { - if (rb.req.getParams().getBool(StatsParams.STATS,false)) { - rb.setNeedDocSet( true ); + if (rb.req.getParams().getBool(StatsParams.STATS, false)) { + rb.setNeedDocSet(true); rb.doStats = true; rb._statsInfo = new StatsInfo(rb); for (StatsField statsField : rb._statsInfo.getStatsFields()) { if (statsField.getSchemaField() != null && statsField.getSchemaField().getType().isPointField() && !statsField.getSchemaField().hasDocValues()) { - throw new SolrException(SolrException.ErrorCode.BAD_REQUEST, + throw new SolrException(SolrException.ErrorCode.BAD_REQUEST, "Can't calculate stats on a PointField without docValues"); } } @@ -63,8 +59,8 @@ public class StatsComponent extends SearchComponent { DocSet docs = statsField.computeBaseDocSet(); statsValues.put(statsField.getOutputKey(), statsField.computeLocalStatsValues(docs)); } - - rb.rsp.add( "stats", convertToResponse(statsValues) ); + + rb.rsp.add("stats", convertToResponse(statsValues)); } @Override @@ -96,8 +92,8 @@ public class StatsComponent extends SearchComponent { for (ShardResponse srsp : sreq.responses) { NamedList stats = null; try { - stats = (NamedList>>) - srsp.getSolrResponse().getResponse().get("stats"); + stats = (NamedList>>) + srsp.getSolrResponse().getResponse().get("stats"); } catch (Exception e) { if (ShardParams.getShardsTolerantAsBool(rb.req.getParams())) { continue; // looks like a shard did not return anything @@ -141,17 +137,17 @@ public class StatsComponent extends SearchComponent { /** * Given a map of {@link StatsValues} using the appropriate response key, - * builds up the necessary "stats" data structure for including in the response -- + * builds up the necessary "stats" data structure for including in the response -- * including the esoteric "stats_fields" wrapper. */ public static NamedList>> convertToResponse - (Map statsValues) { + (Map statsValues) { NamedList>> stats = new SimpleOrderedMap<>(); NamedList> stats_fields = new SimpleOrderedMap<>(); stats.add("stats_fields", stats_fields); - - for (Map.Entry entry : statsValues.entrySet()) { + + for (Map.Entry entry : statsValues.entrySet()) { String key = entry.getKey(); NamedList stv = entry.getValue().getStatsValues(); stats_fields.add(key, stv); @@ -169,87 +165,3 @@ public class StatsComponent extends SearchComponent { } } -/** - * Models all of the information about stats needed for a single request - * @see StatsField - */ -class StatsInfo { - - private final ResponseBuilder rb; - private final List statsFields = new ArrayList<>(7); - private final Map distribStatsValues = new LinkedHashMap<>(); - private final Map statsFieldMap = new LinkedHashMap<>(); - private final Map> tagToStatsFields = new LinkedHashMap<>(); - - public StatsInfo(ResponseBuilder rb) { - this.rb = rb; - SolrParams params = rb.req.getParams(); - String[] statsParams = params.getParams(StatsParams.STATS_FIELD); - if (null == statsParams) { - // no stats.field params, nothing to parse. - return; - } - - for (String paramValue : statsParams) { - StatsField current = new StatsField(rb, paramValue); - statsFields.add(current); - for (String tag : current.getTagList()) { - List fieldList = tagToStatsFields.get(tag); - if (fieldList == null) { - fieldList = new ArrayList<>(); - } - fieldList.add(current); - tagToStatsFields.put(tag, fieldList); - } - statsFieldMap.put(current.getOutputKey(), current); - distribStatsValues.put(current.getOutputKey(), - StatsValuesFactory.createStatsValues(current)); - } - } - - /** - * Returns an immutable list of {@link StatsField} instances - * modeling each of the {@link StatsParams#STATS_FIELD} params specified - * as part of this request - */ - public List getStatsFields() { - return Collections.unmodifiableList(statsFields); - } - - /** - * Returns the {@link StatsField} associated with the specified (effective) - * outputKey, or null if there was no {@link StatsParams#STATS_FIELD} param - * that would corrispond with that key. - */ - public StatsField getStatsField(String outputKey) { - return statsFieldMap.get(outputKey); - } - - /** - * Return immutable list of {@link StatsField} instances by string tag local parameter. - * - * @param tag tag local parameter - * @return list of stats fields - */ - public List getStatsFieldsByTag(String tag) { - List raw = tagToStatsFields.get(tag); - if (null == raw) { - return Collections.emptyList(); - } else { - return Collections.unmodifiableList(raw); - } - } - - /** - * Returns an immutable map of response key => {@link StatsValues} - * instances for the current distributed request. - * Depending on where we are in the process of handling this request, - * these {@link StatsValues} instances may not be complete -- but they - * will never be null. - */ - public Map getAggregateStatsValues() { - return Collections.unmodifiableMap(distribStatsValues); - } - -} - diff --git a/solr/core/src/java/org/apache/solr/handler/component/StatsInfo.java b/solr/core/src/java/org/apache/solr/handler/component/StatsInfo.java new file mode 100644 index 00000000000..f3f2871b246 --- /dev/null +++ b/solr/core/src/java/org/apache/solr/handler/component/StatsInfo.java @@ -0,0 +1,108 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.solr.handler.component; + +import org.apache.solr.common.params.SolrParams; +import org.apache.solr.common.params.StatsParams; + +import java.util.*; + +/** + * Models all of the information about stats needed for a single request + * + * @see StatsField + */ +class StatsInfo { + + private final ResponseBuilder rb; + private final List statsFields = new ArrayList<>(7); + private final Map distribStatsValues = new LinkedHashMap<>(); + private final Map statsFieldMap = new LinkedHashMap<>(); + private final Map> tagToStatsFields = new LinkedHashMap<>(); + + public StatsInfo(ResponseBuilder rb) { + this.rb = rb; + SolrParams params = rb.req.getParams(); + String[] statsParams = params.getParams(StatsParams.STATS_FIELD); + if (null == statsParams) { + // no stats.field params, nothing to parse. + return; + } + + for (String paramValue : statsParams) { + StatsField current = new StatsField(rb, paramValue); + statsFields.add(current); + for (String tag : current.getTagList()) { + List fieldList = tagToStatsFields.get(tag); + if (fieldList == null) { + fieldList = new ArrayList<>(); + } + fieldList.add(current); + tagToStatsFields.put(tag, fieldList); + } + statsFieldMap.put(current.getOutputKey(), current); + distribStatsValues.put(current.getOutputKey(), + StatsValuesFactory.createStatsValues(current)); + } + } + + /** + * Returns an immutable list of {@link StatsField} instances + * modeling each of the {@link StatsParams#STATS_FIELD} params specified + * as part of this request + */ + public List getStatsFields() { + return Collections.unmodifiableList(statsFields); + } + + /** + * Returns the {@link StatsField} associated with the specified (effective) + * outputKey, or null if there was no {@link StatsParams#STATS_FIELD} param + * that would corrispond with that key. + */ + public StatsField getStatsField(String outputKey) { + return statsFieldMap.get(outputKey); + } + + /** + * Return immutable list of {@link StatsField} instances by string tag local parameter. + * + * @param tag tag local parameter + * @return list of stats fields + */ + public List getStatsFieldsByTag(String tag) { + List raw = tagToStatsFields.get(tag); + if (null == raw) { + return Collections.emptyList(); + } else { + return Collections.unmodifiableList(raw); + } + } + + /** + * Returns an immutable map of response key => {@link StatsValues} + * instances for the current distributed request. + * Depending on where we are in the process of handling this request, + * these {@link StatsValues} instances may not be complete -- but they + * will never be null. + */ + public Map getAggregateStatsValues() { + return Collections.unmodifiableMap(distribStatsValues); + } + +} diff --git a/solr/core/src/java/org/apache/solr/handler/export/DoubleCmp.java b/solr/core/src/java/org/apache/solr/handler/export/DoubleComp.java similarity index 69% rename from solr/core/src/java/org/apache/solr/handler/export/DoubleCmp.java rename to solr/core/src/java/org/apache/solr/handler/export/DoubleComp.java index 50341fd490c..69739484086 100644 --- a/solr/core/src/java/org/apache/solr/handler/export/DoubleCmp.java +++ b/solr/core/src/java/org/apache/solr/handler/export/DoubleComp.java @@ -19,25 +19,27 @@ package org.apache.solr.handler.export; interface DoubleComp { int compare(double a, double b); + double resetValue(); -} -class DoubleAsc implements DoubleComp { - public double resetValue() { - return Double.MAX_VALUE; + + static class DoubleAsc implements DoubleComp { + public double resetValue() { + return Double.MAX_VALUE; + } + + public int compare(double a, double b) { + return Double.compare(b, a); + } } - public int compare(double a, double b) { - return Double.compare(b, a); - } -} - -class DoubleDesc implements DoubleComp { - public double resetValue() { - return -Double.MAX_VALUE; - } - - public int compare(double a, double b) { - return Double.compare(a, b); + static class DoubleDesc implements DoubleComp { + public double resetValue() { + return -Double.MAX_VALUE; + } + + public int compare(double a, double b) { + return Double.compare(a, b); + } } } diff --git a/solr/core/src/java/org/apache/solr/handler/export/ExportWriter.java b/solr/core/src/java/org/apache/solr/handler/export/ExportWriter.java index e4d6da0a5bb..adacd776bb6 100644 --- a/solr/core/src/java/org/apache/solr/handler/export/ExportWriter.java +++ b/solr/core/src/java/org/apache/solr/handler/export/ExportWriter.java @@ -408,41 +408,41 @@ public class ExportWriter implements SolrCore.RawWriter, Closeable { if (ft instanceof IntValueFieldType) { if (reverse) { - sortValues[i] = new IntValue(field, new IntDesc()); + sortValues[i] = new IntValue(field, new IntComp.IntDesc()); } else { - sortValues[i] = new IntValue(field, new IntAsc()); + sortValues[i] = new IntValue(field, new IntComp.IntAsc()); } } else if (ft instanceof FloatValueFieldType) { if (reverse) { - sortValues[i] = new FloatValue(field, new FloatDesc()); + sortValues[i] = new FloatValue(field, new FloatComp.FloatDesc()); } else { - sortValues[i] = new FloatValue(field, new FloatAsc()); + sortValues[i] = new FloatValue(field, new FloatComp.FloatAsc()); } } else if (ft instanceof DoubleValueFieldType) { if (reverse) { - sortValues[i] = new DoubleValue(field, new DoubleDesc()); + sortValues[i] = new DoubleValue(field, new DoubleComp.DoubleDesc()); } else { - sortValues[i] = new DoubleValue(field, new DoubleAsc()); + sortValues[i] = new DoubleValue(field, new DoubleComp.DoubleAsc()); } } else if (ft instanceof LongValueFieldType) { if (reverse) { - sortValues[i] = new LongValue(field, new LongDesc()); + sortValues[i] = new LongValue(field, new LongComp.LongDesc()); } else { - sortValues[i] = new LongValue(field, new LongAsc()); + sortValues[i] = new LongValue(field, new LongComp.LongAsc()); } } else if (ft instanceof StrField || ft instanceof SortableTextField) { LeafReader reader = searcher.getSlowAtomicReader(); SortedDocValues vals = reader.getSortedDocValues(field); if (reverse) { - sortValues[i] = new StringValue(vals, field, new IntDesc()); + sortValues[i] = new StringValue(vals, field, new IntComp.IntDesc()); } else { - sortValues[i] = new StringValue(vals, field, new IntAsc()); + sortValues[i] = new StringValue(vals, field, new IntComp.IntAsc()); } } else if (ft instanceof DateValueFieldType) { if (reverse) { - sortValues[i] = new LongValue(field, new LongDesc()); + sortValues[i] = new LongValue(field, new LongComp.LongDesc()); } else { - sortValues[i] = new LongValue(field, new LongAsc()); + sortValues[i] = new LongValue(field, new LongComp.LongAsc()); } } else if (ft instanceof BoolField) { // This is a bit of a hack, but since the boolean field stores ByteRefs, just like Strings @@ -451,9 +451,9 @@ public class ExportWriter implements SolrCore.RawWriter, Closeable { LeafReader reader = searcher.getSlowAtomicReader(); SortedDocValues vals = reader.getSortedDocValues(field); if (reverse) { - sortValues[i] = new StringValue(vals, field, new IntDesc()); + sortValues[i] = new StringValue(vals, field, new IntComp.IntDesc()); } else { - sortValues[i] = new StringValue(vals, field, new IntAsc()); + sortValues[i] = new StringValue(vals, field, new IntComp.IntAsc()); } } else { throw new IOException("Sort fields must be one of the following types: int,float,long,double,string,date,boolean,SortableText"); diff --git a/solr/core/src/java/org/apache/solr/handler/export/FloatCmp.java b/solr/core/src/java/org/apache/solr/handler/export/FloatComp.java similarity index 70% rename from solr/core/src/java/org/apache/solr/handler/export/FloatCmp.java rename to solr/core/src/java/org/apache/solr/handler/export/FloatComp.java index 7ef078c76cf..1ce6e57f028 100644 --- a/solr/core/src/java/org/apache/solr/handler/export/FloatCmp.java +++ b/solr/core/src/java/org/apache/solr/handler/export/FloatComp.java @@ -19,26 +19,26 @@ package org.apache.solr.handler.export; interface FloatComp { int compare(float a, float b); + float resetValue(); -} -class FloatAsc implements FloatComp { - public float resetValue() { - return Float.MAX_VALUE; + static class FloatAsc implements FloatComp { + public float resetValue() { + return Float.MAX_VALUE; + } + + public int compare(float a, float b) { + return Float.compare(b, a); + } } - public int compare(float a, float b) { - return Float.compare(b, a); + static class FloatDesc implements FloatComp { + public float resetValue() { + return -Float.MAX_VALUE; + } + + public int compare(float a, float b) { + return Float.compare(a, b); + } } } - -class FloatDesc implements FloatComp { - public float resetValue() { - return -Float.MAX_VALUE; - } - - public int compare(float a, float b) { - return Float.compare(a, b); - } -} - diff --git a/solr/core/src/java/org/apache/solr/handler/export/IntComp.java b/solr/core/src/java/org/apache/solr/handler/export/IntComp.java index ac83d5dc1e6..b44ebc842a6 100644 --- a/solr/core/src/java/org/apache/solr/handler/export/IntComp.java +++ b/solr/core/src/java/org/apache/solr/handler/export/IntComp.java @@ -19,27 +19,29 @@ package org.apache.solr.handler.export; public interface IntComp { int compare(int a, int b); + int resetValue(); -} -class IntAsc implements IntComp { - public int resetValue() { - return Integer.MAX_VALUE; + static class IntAsc implements IntComp { + + public int resetValue() { + return Integer.MAX_VALUE; + } + + public int compare(int a, int b) { + return Integer.compare(b, a); + } } - public int compare(int a, int b) { - return Integer.compare(b, a); - } -} + static class IntDesc implements IntComp { -class IntDesc implements IntComp { + public int resetValue() { + return Integer.MIN_VALUE; + } - public int resetValue() { - return Integer.MIN_VALUE; + public int compare(int a, int b) { + return Integer.compare(a, b); + } } - - public int compare(int a, int b) { - return Integer.compare(a, b); - } -} +} \ No newline at end of file diff --git a/solr/core/src/java/org/apache/solr/handler/export/LongCmp.java b/solr/core/src/java/org/apache/solr/handler/export/LongComp.java similarity index 70% rename from solr/core/src/java/org/apache/solr/handler/export/LongCmp.java rename to solr/core/src/java/org/apache/solr/handler/export/LongComp.java index 7d997acba1a..45a522c23f3 100644 --- a/solr/core/src/java/org/apache/solr/handler/export/LongCmp.java +++ b/solr/core/src/java/org/apache/solr/handler/export/LongComp.java @@ -19,27 +19,28 @@ package org.apache.solr.handler.export; interface LongComp { int compare(long a, long b); + long resetValue(); -} -class LongAsc implements LongComp { + static class LongAsc implements LongComp { - public long resetValue() { - return Long.MAX_VALUE; + public long resetValue() { + return Long.MAX_VALUE; + } + + public int compare(long a, long b) { + return Long.compare(b, a); + } } - public int compare(long a, long b) { - return Long.compare(b, a); - } -} + static class LongDesc implements LongComp { -class LongDesc implements LongComp { + public long resetValue() { + return Long.MIN_VALUE; + } - public long resetValue() { - return Long.MIN_VALUE; + public int compare(long a, long b) { + return Long.compare(a, b); + } } - - public int compare(long a, long b) { - return Long.compare(a, b); - } -} +} \ No newline at end of file diff --git a/solr/core/src/java/org/apache/solr/response/transform/ShardAugmenterFactory.java b/solr/core/src/java/org/apache/solr/response/transform/ShardAugmenterFactory.java index e65bb93fd62..e01ba283323 100644 --- a/solr/core/src/java/org/apache/solr/response/transform/ShardAugmenterFactory.java +++ b/solr/core/src/java/org/apache/solr/response/transform/ShardAugmenterFactory.java @@ -38,7 +38,7 @@ public class ShardAugmenterFactory extends TransformerFactory v = "[not a shard request]"; } } - return new ValueAugmenter( field, v ); + return new ValueAugmenterFactory.ValueAugmenter( field, v ); } } diff --git a/solr/core/src/java/org/apache/solr/response/transform/ValueAugmenterFactory.java b/solr/core/src/java/org/apache/solr/response/transform/ValueAugmenterFactory.java index d85a302f507..178fae1ea1f 100644 --- a/solr/core/src/java/org/apache/solr/response/transform/ValueAugmenterFactory.java +++ b/solr/core/src/java/org/apache/solr/response/transform/ValueAugmenterFactory.java @@ -28,31 +28,28 @@ import org.apache.solr.util.DateMathParser; * * @since solr 4.0 */ -public class ValueAugmenterFactory extends TransformerFactory -{ +public class ValueAugmenterFactory extends TransformerFactory { protected Object value = null; protected Object defaultValue = null; @Override public void init(NamedList args) { - value = args.get( "value" ); - if( value == null ) { - defaultValue = args.get( "defaultValue" ); + value = args.get("value"); + if (value == null) { + defaultValue = args.get("defaultValue"); } } - public static Object getObjectFrom( String val, String type ) - { - if( type != null ) { + public static Object getObjectFrom(String val, String type) { + if (type != null) { try { - if( "int".equals( type ) ) return Integer.valueOf( val ); - if( "double".equals( type ) ) return Double.valueOf( val ); - if( "float".equals( type ) ) return Float.valueOf( val ); - if( "date".equals( type ) ) return DateMathParser.parseMath(null, val ); - } - catch( Exception ex ) { - throw new SolrException( ErrorCode.BAD_REQUEST, - "Unable to parse "+type+"="+val, ex ); + if ("int".equals(type)) return Integer.valueOf(val); + if ("double".equals(type)) return Double.valueOf(val); + if ("float".equals(type)) return Float.valueOf(val); + if ("date".equals(type)) return DateMathParser.parseMath(null, val); + } catch (Exception ex) { + throw new SolrException(ErrorCode.BAD_REQUEST, + "Unable to parse " + type + "=" + val, ex); } } return val; @@ -61,43 +58,40 @@ public class ValueAugmenterFactory extends TransformerFactory @Override public DocTransformer create(String field, SolrParams params, SolrQueryRequest req) { Object val = value; - if( val == null ) { + if (val == null) { String v = params.get("v"); - if( v == null ) { + if (v == null) { val = defaultValue; - } - else { + } else { val = getObjectFrom(v, params.get("t")); } - if( val == null ) { - throw new SolrException( ErrorCode.BAD_REQUEST, - "ValueAugmenter is missing a value -- should be defined in solrconfig or inline" ); + if (val == null) { + throw new SolrException(ErrorCode.BAD_REQUEST, + "ValueAugmenter is missing a value -- should be defined in solrconfig or inline"); } } - return new ValueAugmenter( field, val ); - } -} - -class ValueAugmenter extends DocTransformer -{ - final String name; - final Object value; - - public ValueAugmenter( String name, Object value ) - { - this.name = name; - this.value = value; - } - - @Override - public String getName() - { - return name; - } - - @Override - public void transform(SolrDocument doc, int docid) { - doc.setField( name, value ); + return new ValueAugmenter(field, val); + } + + + static class ValueAugmenter extends DocTransformer { + final String name; + final Object value; + + public ValueAugmenter(String name, Object value) { + this.name = name; + this.value = value; + } + + @Override + public String getName() { + return name; + } + + @Override + public void transform(SolrDocument doc, int docid) { + doc.setField(name, value); + } } } diff --git a/solr/core/src/java/org/apache/solr/search/facet/FacetParser.java b/solr/core/src/java/org/apache/solr/search/facet/FacetParser.java index 308228b31e9..d8bb697503a 100644 --- a/solr/core/src/java/org/apache/solr/search/facet/FacetParser.java +++ b/solr/core/src/java/org/apache/solr/search/facet/FacetParser.java @@ -16,18 +16,22 @@ */ package org.apache.solr.search.facet; +import java.util.List; +import java.util.ArrayList; +import java.util.Map; +import java.util.Optional; + import org.apache.solr.common.SolrException; import org.apache.solr.common.params.SolrParams; import org.apache.solr.common.util.NamedList; import org.apache.solr.common.util.StrUtils; +import org.apache.solr.search.FunctionQParser; import org.apache.solr.request.SolrQueryRequest; import org.apache.solr.schema.IndexSchema; -import org.apache.solr.search.FunctionQParser; +import org.apache.solr.search.QParser; import org.apache.solr.search.SyntaxError; -import java.util.ArrayList; -import java.util.List; -import java.util.Map; +import static org.apache.solr.common.params.CommonParams.SORT; abstract class FacetParser { protected FacetRequestT facet; @@ -134,9 +138,9 @@ abstract class FacetParser { switch (type) { case "field": case "terms": - return new FacetRequest.FacetFieldParser(this, key).parse(args); + return new FacetFieldParser(this, key).parse(args); case "query": - return new FacetRequest.FacetQueryParser(this, key).parse(args); + return new FacetQueryParser(this, key).parse(args); case "range": return new FacetRangeParser(this, key).parse(args); case "heatmap": @@ -411,4 +415,223 @@ abstract class FacetParser { nl.addAll(jsonObject); return SolrParams.toSolrParams(nl); } + + // TODO Make this private (or at least not static) and introduce + // a newInstance method on FacetParser that returns one of these? + static class FacetTopParser extends FacetParser { + private SolrQueryRequest req; + + public FacetTopParser(SolrQueryRequest req) { + super(null, "facet"); + this.facet = new FacetQuery(); + this.req = req; + } + + @Override + public FacetQuery parse(Object args) throws SyntaxError { + parseSubs(args); + return facet; + } + + @Override + public SolrQueryRequest getSolrRequest() { + return req; + } + + @Override + public IndexSchema getSchema() { + return req.getSchema(); + } + } + + static class FacetQueryParser extends FacetParser { + public FacetQueryParser(@SuppressWarnings("rawtypes") FacetParser parent, String key) { + super(parent, key); + facet = new FacetQuery(); + } + + @Override + public FacetQuery parse(Object arg) throws SyntaxError { + parseCommonParams(arg); + + String qstring = null; + if (arg instanceof String) { + // just the field name... + qstring = (String)arg; + + } else if (arg instanceof Map) { + @SuppressWarnings({"unchecked"}) + Map m = (Map) arg; + qstring = getString(m, "q", null); + if (qstring == null) { + qstring = getString(m, "query", null); + } + + // OK to parse subs before we have parsed our own query? + // as long as subs don't need to know about it. + parseSubs( m.get("facet") ); + } else if (arg != null) { + // something lke json.facet.facet.query=2 + throw err("Expected string/map for facet query, received " + arg.getClass().getSimpleName() + "=" + arg); + } + + // TODO: substats that are from defaults!!! + + if (qstring != null) { + QParser parser = QParser.getParser(qstring, getSolrRequest()); + parser.setIsFilter(true); + facet.q = parser.getQuery(); + } + + return facet; + } + } + + /*** not a separate type of parser for now... + static class FacetBlockParentParser extends FacetParser { + public FacetBlockParentParser(FacetParser parent, String key) { + super(parent, key); + facet = new FacetBlockParent(); + } + + @Override + public FacetBlockParent parse(Object arg) throws SyntaxError { + parseCommonParams(arg); + + if (arg instanceof String) { + // just the field name... + facet.parents = (String)arg; + + } else if (arg instanceof Map) { + Map m = (Map) arg; + facet.parents = getString(m, "parents", null); + + parseSubs( m.get("facet") ); + } + + return facet; + } + } + ***/ + + static class FacetFieldParser extends FacetParser { + @SuppressWarnings({"rawtypes"}) + public FacetFieldParser(FacetParser parent, String key) { + super(parent, key); + facet = new FacetField(); + } + + public FacetField parse(Object arg) throws SyntaxError { + parseCommonParams(arg); + if (arg instanceof String) { + // just the field name... + facet.field = (String)arg; + + } else if (arg instanceof Map) { + @SuppressWarnings({"unchecked"}) + Map m = (Map) arg; + facet.field = getField(m); + facet.offset = getLong(m, "offset", facet.offset); + facet.limit = getLong(m, "limit", facet.limit); + facet.overrequest = (int) getLong(m, "overrequest", facet.overrequest); + facet.overrefine = (int) getLong(m, "overrefine", facet.overrefine); + if (facet.limit == 0) facet.offset = 0; // normalize. an offset with a limit of non-zero isn't useful. + facet.mincount = getLong(m, "mincount", facet.mincount); + facet.missing = getBoolean(m, "missing", facet.missing); + facet.numBuckets = getBoolean(m, "numBuckets", facet.numBuckets); + facet.prefix = getString(m, "prefix", facet.prefix); + facet.allBuckets = getBoolean(m, "allBuckets", facet.allBuckets); + facet.method = FacetField.FacetMethod.fromString(getString(m, "method", null)); + facet.cacheDf = (int)getLong(m, "cacheDf", facet.cacheDf); + + // TODO: pull up to higher level? + facet.refine = FacetRequest.RefineMethod.fromObj(m.get("refine")); + + facet.perSeg = getBooleanOrNull(m, "perSeg"); + + // facet.sort may depend on a facet stat... + // should we be parsing / validating this here, or in the execution environment? + Object o = m.get("facet"); + parseSubs(o); + + facet.sort = parseAndValidateSort(facet, m, SORT); + facet.prelim_sort = parseAndValidateSort(facet, m, "prelim_sort"); + } else if (arg != null) { + // something like json.facet.facet.field=2 + throw err("Expected string/map for facet field, received " + arg.getClass().getSimpleName() + "=" + arg); + } + + if (null == facet.sort) { + facet.sort = FacetRequest.FacetSort.COUNT_DESC; + } + + return facet; + } + + /** + * Parses, validates and returns the {@link FacetRequest.FacetSort} for given sortParam + * and facet field + *

+ * Currently, supported sort specifications are 'mystat desc' OR {mystat: 'desc'} + * index - This is equivalent to 'index asc' + * count - This is equivalent to 'count desc' + *

+ * + * @param facet {@link FacetField} for which sort needs to be parsed and validated + * @param args map containing the sortVal for given sortParam + * @param sortParam parameter for which sort needs to parsed and validated + * @return parsed facet sort + */ + private static FacetRequest.FacetSort parseAndValidateSort(FacetField facet, Map args, String sortParam) { + Object sort = args.get(sortParam); + if (sort == null) { + return null; + } + + FacetRequest.FacetSort facetSort = null; + + if (sort instanceof String) { + String sortStr = (String)sort; + if (sortStr.endsWith(" asc")) { + facetSort = new FacetRequest.FacetSort(sortStr.substring(0, sortStr.length()-" asc".length()), + FacetRequest.SortDirection.asc); + } else if (sortStr.endsWith(" desc")) { + facetSort = new FacetRequest.FacetSort(sortStr.substring(0, sortStr.length()-" desc".length()), + FacetRequest.SortDirection.desc); + } else { + facetSort = new FacetRequest.FacetSort(sortStr, + // default direction for "index" is ascending + ("index".equals(sortStr) + ? FacetRequest.SortDirection.asc + : FacetRequest.SortDirection.desc)); + } + } else if (sort instanceof Map) { + // { myvar : 'desc' } + @SuppressWarnings("unchecked") + Optional> optional = ((Map)sort).entrySet().stream().findFirst(); + if (optional.isPresent()) { + Map.Entry entry = optional.get(); + facetSort = new FacetRequest.FacetSort(entry.getKey(), FacetRequest.SortDirection.fromObj(entry.getValue())); + } + } else { + throw new SolrException(SolrException.ErrorCode.BAD_REQUEST, + "Expected string/map for '" + sortParam +"', received "+ sort.getClass().getSimpleName() + "=" + sort); + } + + Map facetStats = facet.facetStats; + // validate facet sort + boolean isValidSort = facetSort == null || + "index".equals(facetSort.sortVariable) || + "count".equals(facetSort.sortVariable) || + (facetStats != null && facetStats.containsKey(facetSort.sortVariable)); + + if (!isValidSort) { + throw new SolrException(SolrException.ErrorCode.BAD_REQUEST, + "Invalid " + sortParam + " option '" + sort + "' for field '" + facet.field + "'"); + } + return facetSort; + } + + } + } diff --git a/solr/core/src/java/org/apache/solr/search/facet/FacetRequest.java b/solr/core/src/java/org/apache/solr/search/facet/FacetRequest.java index 42f8488948c..db9d9c9c9cc 100644 --- a/solr/core/src/java/org/apache/solr/search/facet/FacetRequest.java +++ b/solr/core/src/java/org/apache/solr/search/facet/FacetRequest.java @@ -21,16 +21,13 @@ import java.util.LinkedHashMap; import java.util.List; import java.util.Map; import java.util.Objects; -import java.util.Optional; import org.apache.lucene.search.Query; import org.apache.solr.common.SolrException; import org.apache.solr.common.params.SolrParams; import org.apache.solr.request.SolrQueryRequest; -import org.apache.solr.schema.IndexSchema; import org.apache.solr.search.DocSet; import org.apache.solr.search.JoinQParserPlugin; -import org.apache.solr.search.QParser; import org.apache.solr.search.QueryContext; import org.apache.solr.search.SolrConstantScoreQuery; import org.apache.solr.search.SyntaxError; @@ -38,7 +35,6 @@ import org.apache.solr.search.join.GraphQuery; import org.apache.solr.search.join.GraphQueryParser; import org.apache.solr.util.RTimer; -import static org.apache.solr.common.params.CommonParams.SORT; import static org.apache.solr.search.facet.FacetRequest.RefineMethod.NONE; /** @@ -302,7 +298,7 @@ public abstract class FacetRequest { */ public static FacetRequest parse(SolrQueryRequest req, Map params) { @SuppressWarnings({"rawtypes"}) - FacetParser parser = new FacetTopParser(req); + FacetParser parser = new FacetParser.FacetTopParser(req); try { return parser.parse(params); } catch (SyntaxError syntaxError) { @@ -321,7 +317,7 @@ public abstract class FacetRequest { */ public static FacetRequest parseOneFacetReq(SolrQueryRequest req, Map params) { @SuppressWarnings("rawtypes") - FacetParser parser = new FacetTopParser(req); + FacetParser parser = new FacetParser.FacetTopParser(req); try { return (FacetRequest) parser.parseFacetOrStat("", params); } catch (SyntaxError syntaxError) { @@ -437,221 +433,6 @@ public abstract class FacetRequest { public abstract Map getFacetDescription(); - static class FacetTopParser extends FacetParser { - private SolrQueryRequest req; - - public FacetTopParser(SolrQueryRequest req) { - super(null, "facet"); - this.facet = new FacetQuery(); - this.req = req; - } - - @Override - public FacetQuery parse(Object args) throws SyntaxError { - parseSubs(args); - return facet; - } - - @Override - public SolrQueryRequest getSolrRequest() { - return req; - } - - @Override - public IndexSchema getSchema() { - return req.getSchema(); - } - } - - static class FacetQueryParser extends FacetParser { - public FacetQueryParser(@SuppressWarnings("rawtypes") FacetParser parent, String key) { - super(parent, key); - facet = new FacetQuery(); - } - - @Override - public FacetQuery parse(Object arg) throws SyntaxError { - parseCommonParams(arg); - - String qstring = null; - if (arg instanceof String) { - // just the field name... - qstring = (String)arg; - - } else if (arg instanceof Map) { - @SuppressWarnings({"unchecked"}) - Map m = (Map) arg; - qstring = getString(m, "q", null); - if (qstring == null) { - qstring = getString(m, "query", null); - } - - // OK to parse subs before we have parsed our own query? - // as long as subs don't need to know about it. - parseSubs( m.get("facet") ); - } else if (arg != null) { - // something lke json.facet.facet.query=2 - throw err("Expected string/map for facet query, received " + arg.getClass().getSimpleName() + "=" + arg); - } - - // TODO: substats that are from defaults!!! - - if (qstring != null) { - QParser parser = QParser.getParser(qstring, getSolrRequest()); - parser.setIsFilter(true); - facet.q = parser.getQuery(); - } - - return facet; - } - } - -/*** not a separate type of parser for now... -static class FacetBlockParentParser extends FacetParser { - public FacetBlockParentParser(FacetParser parent, String key) { - super(parent, key); - facet = new FacetBlockParent(); - } - - @Override - public FacetBlockParent parse(Object arg) throws SyntaxError { - parseCommonParams(arg); - - if (arg instanceof String) { - // just the field name... - facet.parents = (String)arg; - - } else if (arg instanceof Map) { - Map m = (Map) arg; - facet.parents = getString(m, "parents", null); - - parseSubs( m.get("facet") ); - } - - return facet; - } - } - ***/ - - static class FacetFieldParser extends FacetParser { - @SuppressWarnings({"rawtypes"}) - public FacetFieldParser(FacetParser parent, String key) { - super(parent, key); - facet = new FacetField(); - } - - public FacetField parse(Object arg) throws SyntaxError { - parseCommonParams(arg); - if (arg instanceof String) { - // just the field name... - facet.field = (String)arg; - - } else if (arg instanceof Map) { - @SuppressWarnings({"unchecked"}) - Map m = (Map) arg; - facet.field = getField(m); - facet.offset = getLong(m, "offset", facet.offset); - facet.limit = getLong(m, "limit", facet.limit); - facet.overrequest = (int) getLong(m, "overrequest", facet.overrequest); - facet.overrefine = (int) getLong(m, "overrefine", facet.overrefine); - if (facet.limit == 0) facet.offset = 0; // normalize. an offset with a limit of non-zero isn't useful. - facet.mincount = getLong(m, "mincount", facet.mincount); - facet.missing = getBoolean(m, "missing", facet.missing); - facet.numBuckets = getBoolean(m, "numBuckets", facet.numBuckets); - facet.prefix = getString(m, "prefix", facet.prefix); - facet.allBuckets = getBoolean(m, "allBuckets", facet.allBuckets); - facet.method = FacetField.FacetMethod.fromString(getString(m, "method", null)); - facet.cacheDf = (int)getLong(m, "cacheDf", facet.cacheDf); - - // TODO: pull up to higher level? - facet.refine = RefineMethod.fromObj(m.get("refine")); - - facet.perSeg = getBooleanOrNull(m, "perSeg"); - - // facet.sort may depend on a facet stat... - // should we be parsing / validating this here, or in the execution environment? - Object o = m.get("facet"); - parseSubs(o); - - facet.sort = parseAndValidateSort(facet, m, SORT); - facet.prelim_sort = parseAndValidateSort(facet, m, "prelim_sort"); - } else if (arg != null) { - // something like json.facet.facet.field=2 - throw err("Expected string/map for facet field, received " + arg.getClass().getSimpleName() + "=" + arg); - } - - if (null == facet.sort) { - facet.sort = FacetSort.COUNT_DESC; - } - - return facet; - } - - /** - * Parses, validates and returns the {@link FacetSort} for given sortParam - * and facet field - *

- * Currently, supported sort specifications are 'mystat desc' OR {mystat: 'desc'} - * index - This is equivalent to 'index asc' - * count - This is equivalent to 'count desc' - *

- * - * @param facet {@link FacetField} for which sort needs to be parsed and validated - * @param args map containing the sortVal for given sortParam - * @param sortParam parameter for which sort needs to parsed and validated - * @return parsed facet sort - */ - private static FacetSort parseAndValidateSort(FacetField facet, Map args, String sortParam) { - Object sort = args.get(sortParam); - if (sort == null) { - return null; - } - - FacetSort facetSort = null; - - if (sort instanceof String) { - String sortStr = (String)sort; - if (sortStr.endsWith(" asc")) { - facetSort = new FacetSort(sortStr.substring(0, sortStr.length()-" asc".length()), - SortDirection.asc); - } else if (sortStr.endsWith(" desc")) { - facetSort = new FacetSort(sortStr.substring(0, sortStr.length()-" desc".length()), - SortDirection.desc); - } else { - facetSort = new FacetSort(sortStr, - // default direction for "index" is ascending - ("index".equals(sortStr) - ? SortDirection.asc - : SortDirection.desc)); - } - } else if (sort instanceof Map) { - // { myvar : 'desc' } - @SuppressWarnings("unchecked") - Optional> optional = ((Map)sort).entrySet().stream().findFirst(); - if (optional.isPresent()) { - Map.Entry entry = optional.get(); - facetSort = new FacetSort(entry.getKey(), SortDirection.fromObj(entry.getValue())); - } - } else { - throw new SolrException(SolrException.ErrorCode.BAD_REQUEST, - "Expected string/map for '" + sortParam +"', received "+ sort.getClass().getSimpleName() + "=" + sort); - } - - Map facetStats = facet.facetStats; - // validate facet sort - boolean isValidSort = facetSort == null || - "index".equals(facetSort.sortVariable) || - "count".equals(facetSort.sortVariable) || - (facetStats != null && facetStats.containsKey(facetSort.sortVariable)); - - if (!isValidSort) { - throw new SolrException(SolrException.ErrorCode.BAD_REQUEST, - "Invalid " + sortParam + " option '" + sort + "' for field '" + facet.field + "'"); - } - return facetSort; - } - - } } diff --git a/solr/core/src/java/org/apache/solr/search/join/GraphTermsCollector.java b/solr/core/src/java/org/apache/solr/search/join/GraphEdgeCollector.java similarity index 59% rename from solr/core/src/java/org/apache/solr/search/join/GraphTermsCollector.java rename to solr/core/src/java/org/apache/solr/search/join/GraphEdgeCollector.java index 6ca02d35728..02ed123f3a7 100644 --- a/solr/core/src/java/org/apache/solr/search/join/GraphTermsCollector.java +++ b/solr/core/src/java/org/apache/solr/search/join/GraphEdgeCollector.java @@ -53,7 +53,7 @@ abstract class GraphEdgeCollector extends SimpleCollector implements Collector { // known leaf nodes DocSet leafNodes; - int numHits=0; // number of documents visited + int numHits = 0; // number of documents visited BitSet bits; // if not null, used to collect documents visited int base; @@ -74,8 +74,10 @@ abstract class GraphEdgeCollector extends SimpleCollector implements Collector { } // the number of docs visited - public int getNumHits() { return numHits; } - + public int getNumHits() { + return numHits; + } + public void collect(int segDoc) throws IOException { int doc = segDoc + base; if (skipSet != null && skipSet.exists(doc)) { @@ -91,19 +93,19 @@ abstract class GraphEdgeCollector extends SimpleCollector implements Collector { // Optimization to not look up edges for a document that is a leaf node (i.e. has no outgoing edges) if (leafNodes == null || !leafNodes.exists(doc)) { addEdgeIdsToResult(segDoc); - } + } // Note: tracking links in for each result would be a huge memory hog... so not implementing at this time. } - + abstract void addEdgeIdsToResult(int doc) throws IOException; - + private void addDocToResult(int docWithBase) { // this document is part of the traversal. mark it in our bitmap. bits.set(docWithBase); // increment the hit count so we know how many docs we traversed this time. numHits++; } - + @Override public void doSetNextReader(LeafReaderContext context) throws IOException { base = context.docBase; @@ -115,87 +117,90 @@ abstract class GraphEdgeCollector extends SimpleCollector implements Collector { public ScoreMode scoreMode() { return ScoreMode.COMPLETE_NO_SCORES; } - -} - -class GraphTermsCollector extends GraphEdgeCollector { - // all the collected terms - private BytesRefHash collectorTerms; - private SortedSetDocValues docTermOrds; - GraphTermsCollector(SchemaField collectField, DocSet skipSet, DocSet leafNodes) { - super(collectField, skipSet, leafNodes); - this.collectorTerms = new BytesRefHash(); - } + static class GraphTermsCollector extends GraphEdgeCollector { + // all the collected terms + private BytesRefHash collectorTerms; + private SortedSetDocValues docTermOrds; - @Override - public void doSetNextReader(LeafReaderContext context) throws IOException { - super.doSetNextReader(context); - // Grab the updated doc values. - docTermOrds = DocValues.getSortedSet(context.reader(), collectField.getName()); - } - @Override - void addEdgeIdsToResult(int doc) throws IOException { - // set the doc to pull the edges ids for. - if (doc > docTermOrds.docID()) { - docTermOrds.advance(doc); + GraphTermsCollector(SchemaField collectField, DocSet skipSet, DocSet leafNodes) { + super(collectField, skipSet, leafNodes); + this.collectorTerms = new BytesRefHash(); } - if (doc == docTermOrds.docID()) { - BytesRef edgeValue = new BytesRef(); - long ord; - while ((ord = docTermOrds.nextOrd()) != SortedSetDocValues.NO_MORE_ORDS) { - edgeValue = docTermOrds.lookupOrd(ord); - // add the edge id to the collector terms. - collectorTerms.add(edgeValue); + + @Override + public void doSetNextReader(LeafReaderContext context) throws IOException { + super.doSetNextReader(context); + // Grab the updated doc values. + docTermOrds = DocValues.getSortedSet(context.reader(), collectField.getName()); + } + + @Override + void addEdgeIdsToResult(int doc) throws IOException { + // set the doc to pull the edges ids for. + if (doc > docTermOrds.docID()) { + docTermOrds.advance(doc); } - } - } - - @Override - public Query getResultQuery(SchemaField matchField, boolean useAutomaton) { - if (collectorTerms == null || collectorTerms.size() == 0) { - // return null if there are no terms (edges) to traverse. - return null; - } else { - // Create a query - Query q = null; - - // TODO: see if we should dynamically select this based on the frontier size. - if (useAutomaton) { - // build an automaton based query for the frontier. - Automaton autn = buildAutomaton(collectorTerms); - AutomatonQuery autnQuery = new AutomatonQuery(new Term(matchField.getName()), autn); - q = autnQuery; - } else { - List termList = new ArrayList<>(collectorTerms.size()); - for (int i = 0 ; i < collectorTerms.size(); i++) { - BytesRef ref = new BytesRef(); - collectorTerms.get(i, ref); - termList.add(ref); + if (doc == docTermOrds.docID()) { + BytesRef edgeValue = new BytesRef(); + long ord; + while ((ord = docTermOrds.nextOrd()) != SortedSetDocValues.NO_MORE_ORDS) { + edgeValue = docTermOrds.lookupOrd(ord); + // add the edge id to the collector terms. + collectorTerms.add(edgeValue); } - q = (matchField.hasDocValues() && !matchField.indexed()) - ? new DocValuesTermsQuery(matchField.getName(), termList) - : new TermInSetQuery(matchField.getName(), termList); } - - return q; } - } + @Override + public Query getResultQuery(SchemaField matchField, boolean useAutomaton) { + if (collectorTerms == null || collectorTerms.size() == 0) { + // return null if there are no terms (edges) to traverse. + return null; + } else { + // Create a query + Query q = null; - /** Build an automaton to represent the frontier query */ - private Automaton buildAutomaton(BytesRefHash termBytesHash) { - // need top pass a sorted set of terms to the autn builder (maybe a better way to avoid this?) - final TreeSet terms = new TreeSet(); - for (int i = 0 ; i < termBytesHash.size(); i++) { - BytesRef ref = new BytesRef(); - termBytesHash.get(i, ref); - terms.add(ref); + // TODO: see if we should dynamically select this based on the frontier size. + if (useAutomaton) { + // build an automaton based query for the frontier. + Automaton autn = buildAutomaton(collectorTerms); + AutomatonQuery autnQuery = new AutomatonQuery(new Term(matchField.getName()), autn); + q = autnQuery; + } else { + List termList = new ArrayList<>(collectorTerms.size()); + for (int i = 0; i < collectorTerms.size(); i++) { + BytesRef ref = new BytesRef(); + collectorTerms.get(i, ref); + termList.add(ref); + } + q = (matchField.hasDocValues() && !matchField.indexed()) + ? new DocValuesTermsQuery(matchField.getName(), termList) + : new TermInSetQuery(matchField.getName(), termList); + } + + return q; + } } - final Automaton a = DaciukMihovAutomatonBuilder.build(terms); - return a; + + + /** + * Build an automaton to represent the frontier query + */ + private Automaton buildAutomaton(BytesRefHash termBytesHash) { + // need top pass a sorted set of terms to the autn builder (maybe a better way to avoid this?) + final TreeSet terms = new TreeSet(); + for (int i = 0; i < termBytesHash.size(); i++) { + BytesRef ref = new BytesRef(); + termBytesHash.get(i, ref); + terms.add(ref); + } + final Automaton a = DaciukMihovAutomatonBuilder.build(terms); + return a; + } + } } diff --git a/solr/core/src/java/org/apache/solr/search/join/GraphQuery.java b/solr/core/src/java/org/apache/solr/search/join/GraphQuery.java index 5bec5997dc0..c25679b9b4c 100644 --- a/solr/core/src/java/org/apache/solr/search/join/GraphQuery.java +++ b/solr/core/src/java/org/apache/solr/search/join/GraphQuery.java @@ -200,7 +200,7 @@ public class GraphQuery extends Query { // Create the graph result collector for this level GraphEdgeCollector graphResultCollector = collectSchemaField.getType().isPointField() ? new GraphPointsCollector(collectSchemaField, new BitDocSet(resultBits), leafNodes) - : new GraphTermsCollector(collectSchemaField, new BitDocSet(resultBits), leafNodes); + : new GraphEdgeCollector.GraphTermsCollector(collectSchemaField, new BitDocSet(resultBits), leafNodes); fromSet = new BitDocSet(new FixedBitSet(capacity)); graphResultCollector.setCollectDocs(fromSet.getBits()); diff --git a/solr/core/src/java/org/apache/solr/update/TransactionLog.java b/solr/core/src/java/org/apache/solr/update/TransactionLog.java index 9fd2f2f5bd2..e9ced2ee8d3 100644 --- a/solr/core/src/java/org/apache/solr/update/TransactionLog.java +++ b/solr/core/src/java/org/apache/solr/update/TransactionLog.java @@ -69,7 +69,7 @@ public class TransactionLog implements Closeable { private boolean debug = log.isDebugEnabled(); private boolean trace = log.isTraceEnabled(); - public final static String END_MESSAGE="SOLR_TLOG_END"; + public final static String END_MESSAGE = "SOLR_TLOG_END"; long id; File tlogFile; @@ -83,7 +83,7 @@ public class TransactionLog implements Closeable { protected volatile boolean deleteOnClose = true; // we can delete old tlogs since they are currently only used for real-time-get (and in the future, recovery) AtomicInteger refcount = new AtomicInteger(1); - Map globalStringMap = new HashMap<>(); + Map globalStringMap = new HashMap<>(); List globalStringList = new ArrayList<>(); // write a BytesRef as a byte array @@ -91,13 +91,13 @@ public class TransactionLog implements Closeable { @Override public Object resolve(Object o, JavaBinCodec codec) throws IOException { if (o instanceof BytesRef) { - BytesRef br = (BytesRef)o; + BytesRef br = (BytesRef) o; codec.writeByteArray(br.bytes, br.offset, br.length); return null; } // Fallback: we have no idea how to serialize this. Be noisy to prevent insidious bugs throw new SolrException(SolrException.ErrorCode.SERVER_ERROR, - "TransactionLog doesn't know how to serialize " + o.getClass() + "; try implementing ObjectResolver?"); + "TransactionLog doesn't know how to serialize " + o.getClass() + "; try implementing ObjectResolver?"); } }; @@ -167,12 +167,12 @@ public class TransactionLog implements Closeable { try { if (debug) { log.debug("New TransactionLog file= {}, exists={}, size={} openExisting={}" - , tlogFile, tlogFile.exists(), tlogFile.length(), openExisting); + , tlogFile, tlogFile.exists(), tlogFile.length(), openExisting); } // Parse tlog id from the filename String filename = tlogFile.getName(); - id = Long.parseLong(filename.substring(filename.lastIndexOf('.')+1)); + id = Long.parseLong(filename.substring(filename.lastIndexOf('.') + 1)); this.tlogFile = tlogFile; raf = new RandomAccessFile(this.tlogFile, "rw"); @@ -197,7 +197,7 @@ public class TransactionLog implements Closeable { log.warn("New transaction log already exists:{} size={}", tlogFile, raf.length()); return; } - + if (start > 0) { raf.setLength(0); } @@ -205,7 +205,7 @@ public class TransactionLog implements Closeable { } success = true; - + assert ObjectReleaseTracker.track(this); } catch (IOException e) { @@ -222,7 +222,8 @@ public class TransactionLog implements Closeable { } // for subclasses - protected TransactionLog() {} + protected TransactionLog() { + } /** Returns the number of records in the log (currently includes the header and an optional commit). * Note: currently returns 0 for reopened existing log files. @@ -241,12 +242,12 @@ public class TransactionLog implements Closeable { } // the end of the file should have the end message (added during a commit) plus a 4 byte size - byte[] buf = new byte[ END_MESSAGE.length() ]; + byte[] buf = new byte[END_MESSAGE.length()]; long pos = size - END_MESSAGE.length() - 4; if (pos < 0) return false; @SuppressWarnings("resource") final ChannelFastInputStream is = new ChannelFastInputStream(channel, pos); is.read(buf); - for (int i=0; i)header.get("strings"); + globalStringList = (List) header.get("strings"); globalStringMap = new HashMap<>(globalStringList.size()); - for (int i=0; i(); - header.put("SOLR_TLOG",1); // a magic string + version number - header.put("strings",globalStringList); + Map header = new LinkedHashMap(); + header.put("SOLR_TLOG", 1); // a magic string + version number + header.put("strings", globalStringList); codec.marshal(header, fos); endRecord(pos); } protected void endRecord(long startRecordPosition) throws IOException { - fos.writeInt((int)(fos.size() - startRecordPosition)); + fos.writeInt((int) (fos.size() - startRecordPosition)); numRecords++; } @@ -347,7 +348,7 @@ public class TransactionLog implements Closeable { * the command to the transaction log.) * @param cmd The add update command to be written * @return Returns the position pointer of the written update command - * + * * @see #write(AddUpdateCommand, long) */ public long write(AddUpdateCommand cmd) { @@ -357,14 +358,14 @@ public class TransactionLog implements Closeable { /** * Writes an add update command to the transaction log. This should be called only for * writing in-place updates, or else pass -1 as the prevPointer. - * @param cmd The add update command to be written - * @param prevPointer The pointer in the transaction log which this update depends - * on (applicable for in-place updates) + * @param cmd The add update command to be written + * @param prevPointer The pointer in the transaction log which this update depends + * on (applicable for in-place updates) * @return Returns the position pointer of the written update command */ public long write(AddUpdateCommand cmd, long prevPointer) { assert (-1 <= prevPointer && (cmd.isInPlaceUpdate() || (-1 == prevPointer))); - + LogCodec codec = new LogCodec(resolver); SolrInputDocument sdoc = cmd.getSolrInputDocument(); @@ -374,7 +375,7 @@ public class TransactionLog implements Closeable { // adaptive buffer sizing int bufSize = lastAddSize; // unsynchronized access of lastAddSize should be fine // at least 256 bytes and at most 1 MB - bufSize = Math.min(1024*1024, Math.max(256, bufSize+(bufSize>>3)+256)); + bufSize = Math.min(1024 * 1024, Math.max(256, bufSize + (bufSize >> 3) + 256)); MemOutputStream out = new MemOutputStream(new byte[bufSize]); codec.init(out); @@ -391,7 +392,7 @@ public class TransactionLog implements Closeable { codec.writeLong(cmd.getVersion()); codec.writeSolrInputDocument(cmd.getSolrInputDocument()); } - lastAddSize = (int)out.size(); + lastAddSize = (int) out.size(); synchronized (this) { long pos = fos.size(); // if we had flushed, this should be equal to channel.position() @@ -465,9 +466,9 @@ public class TransactionLog implements Closeable { // fos.flushBuffer(); // flush later return pos; } - } catch (IOException e) { - throw new SolrException(SolrException.ErrorCode.SERVER_ERROR, e); - } + } catch (IOException e) { + throw new SolrException(SolrException.ErrorCode.SERVER_ERROR, e); + } } @@ -515,10 +516,10 @@ public class TransactionLog implements Closeable { fos.flushBuffer(); /*** System.out.println("###flushBuffer to " + fos.size() + " raf.length()=" + raf.length() + " pos="+pos); - if (fos.size() != raf.length() || pos >= fos.size() ) { - throw new RuntimeException("ERROR" + "###flushBuffer to " + fos.size() + " raf.length()=" + raf.length() + " pos="+pos); - } - ***/ + if (fos.size() != raf.length() || pos >= fos.size() ) { + throw new RuntimeException("ERROR" + "###flushBuffer to " + fos.size() + " raf.length()=" + raf.length() + " pos="+pos); + } + ***/ } ChannelFastInputStream fis = new ChannelFastInputStream(channel, pos); @@ -633,7 +634,8 @@ public class TransactionLog implements Closeable { /** Returns a reader that can be used while a log is still in use. * Currently only *one* LogReader may be outstanding, and that log may only - * be used from a single thread. */ + * be used from a single thread. + */ public LogReader getReader(long startingPos) { return new LogReader(startingPos); } @@ -744,7 +746,7 @@ public class TransactionLog implements Closeable { long pos = startingPos; long lastVersion = Long.MIN_VALUE; - while ( (o = super.next()) != null) { + while ((o = super.next()) != null) { List entry = (List) o; long version = (Long) entry.get(UpdateLog.VERSION_IDX); version = Math.abs(version); @@ -780,10 +782,11 @@ public class TransactionLog implements Closeable { /* returns the position in the log file of the last record returned by next() */ public abstract long position(); + public abstract void close(); @Override - public abstract String toString() ; + public abstract String toString(); } @@ -812,7 +815,7 @@ public class TransactionLog implements Closeable { } fis = new ChannelFastInputStream(channel, 0); - if (sz >=4) { + if (sz >= 4) { // readHeader(fis); // should not be needed prevPos = sz - 4; fis.seek(prevPos); @@ -843,7 +846,7 @@ public class TransactionLog implements Closeable { } else { // Position buffer so that this record is at the end. // For small records, this will cause subsequent calls to next() to be within the buffer. - long seekPos = endOfThisRecord - fis.getBufferSize(); + long seekPos = endOfThisRecord - fis.getBufferSize(); seekPos = Math.min(seekPos, prevPos); // seek to the start of the record if it's larger then the block size. seekPos = Math.max(seekPos, 0); fis.seek(seekPos); @@ -880,57 +883,54 @@ public class TransactionLog implements Closeable { } -} + static class ChannelFastInputStream extends FastInputStream { + private FileChannel ch; - - -class ChannelFastInputStream extends FastInputStream { - private FileChannel ch; - - public ChannelFastInputStream(FileChannel ch, long chPosition) { - // super(null, new byte[10],0,0); // a small buffer size for testing purposes - super(null); - this.ch = ch; - super.readFromStream = chPosition; - } - - @Override - public int readWrappedStream(byte[] target, int offset, int len) throws IOException { - ByteBuffer bb = ByteBuffer.wrap(target, offset, len); - int ret = ch.read(bb, readFromStream); - return ret; - } - - public void seek(long position) throws IOException { - if (position <= readFromStream && position >= getBufferPos()) { - // seek within buffer - pos = (int)(position - getBufferPos()); - } else { - // long currSize = ch.size(); // not needed - underlying read should handle (unless read never done) - // if (position > currSize) throw new EOFException("Read past EOF: seeking to " + position + " on file of size " + currSize + " file=" + ch); - readFromStream = position; - end = pos = 0; + public ChannelFastInputStream(FileChannel ch, long chPosition) { + // super(null, new byte[10],0,0); // a small buffer size for testing purposes + super(null); + this.ch = ch; + super.readFromStream = chPosition; + } + + @Override + public int readWrappedStream(byte[] target, int offset, int len) throws IOException { + ByteBuffer bb = ByteBuffer.wrap(target, offset, len); + int ret = ch.read(bb, readFromStream); + return ret; + } + + public void seek(long position) throws IOException { + if (position <= readFromStream && position >= getBufferPos()) { + // seek within buffer + pos = (int) (position - getBufferPos()); + } else { + // long currSize = ch.size(); // not needed - underlying read should handle (unless read never done) + // if (position > currSize) throw new EOFException("Read past EOF: seeking to " + position + " on file of size " + currSize + " file=" + ch); + readFromStream = position; + end = pos = 0; + } + assert position() == position; } - assert position() == position; - } /** where is the start of the buffer relative to the whole file */ - public long getBufferPos() { - return readFromStream - end; - } + public long getBufferPos() { + return readFromStream - end; + } - public int getBufferSize() { - return buf.length; - } + public int getBufferSize() { + return buf.length; + } - @Override - public void close() throws IOException { - ch.close(); - } + @Override + public void close() throws IOException { + ch.close(); + } - @Override - public String toString() { - return "readFromStream="+readFromStream +" pos="+pos +" end="+end + " bufferPos="+getBufferPos() + " position="+position() ; + @Override + public String toString() { + return "readFromStream=" + readFromStream + " pos=" + pos + " end=" + end + " bufferPos=" + getBufferPos() + " position=" + position(); + } } } diff --git a/solr/core/src/java/org/apache/solr/update/processor/DistributedZkUpdateProcessor.java b/solr/core/src/java/org/apache/solr/update/processor/DistributedZkUpdateProcessor.java index 158900d65d0..8da2df75fb4 100644 --- a/solr/core/src/java/org/apache/solr/update/processor/DistributedZkUpdateProcessor.java +++ b/solr/core/src/java/org/apache/solr/update/processor/DistributedZkUpdateProcessor.java @@ -131,7 +131,7 @@ public class DistributedZkUpdateProcessor extends DistributedUpdateProcessor { while (nextInChain != null) { Class klass = nextInChain.getClass(); if (klass != LogUpdateProcessorFactory.LogUpdateProcessor.class - && klass != RunUpdateProcessor.class + && klass != RunUpdateProcessorFactory.RunUpdateProcessor.class && klass != TolerantUpdateProcessor.class) { shouldClone = true; break; diff --git a/solr/core/src/java/org/apache/solr/update/processor/RunUpdateProcessorFactory.java b/solr/core/src/java/org/apache/solr/update/processor/RunUpdateProcessorFactory.java index d49ab271921..a208d41cc9b 100644 --- a/solr/core/src/java/org/apache/solr/update/processor/RunUpdateProcessorFactory.java +++ b/solr/core/src/java/org/apache/solr/update/processor/RunUpdateProcessorFactory.java @@ -33,14 +33,12 @@ import org.apache.solr.update.*; * @since solr 1.3 * @see DistributingUpdateProcessorFactory */ -public class RunUpdateProcessorFactory extends UpdateRequestProcessorFactory -{ +public class RunUpdateProcessorFactory extends UpdateRequestProcessorFactory { public static final String PRE_RUN_CHAIN_NAME = "_preRun_"; @Override - public UpdateRequestProcessor getInstance(SolrQueryRequest req, SolrQueryResponse rsp, UpdateRequestProcessor next) - { + public UpdateRequestProcessor getInstance(SolrQueryRequest req, SolrQueryResponse rsp, UpdateRequestProcessor next) { RunUpdateProcessor runUpdateProcessor = new RunUpdateProcessor(req, next); UpdateRequestProcessorChain preRun = req.getCore().getUpdateProcessingChain(PRE_RUN_CHAIN_NAME); if (preRun != null) { @@ -49,82 +47,79 @@ public class RunUpdateProcessorFactory extends UpdateRequestProcessorFactory return runUpdateProcessor; } } -} -class RunUpdateProcessor extends UpdateRequestProcessor -{ - private final SolrQueryRequest req; - private final UpdateHandler updateHandler; - private boolean changesSinceCommit = false; + static class RunUpdateProcessor extends UpdateRequestProcessor { + private final SolrQueryRequest req; + private final UpdateHandler updateHandler; - public RunUpdateProcessor(SolrQueryRequest req, UpdateRequestProcessor next) { - super( next ); - this.req = req; - this.updateHandler = req.getCore().getUpdateHandler(); - } + private boolean changesSinceCommit = false; - @Override - public void processAdd(AddUpdateCommand cmd) throws IOException { - - if (AtomicUpdateDocumentMerger.isAtomicUpdate(cmd)) { - throw new SolrException - (SolrException.ErrorCode.BAD_REQUEST, - "RunUpdateProcessor has received an AddUpdateCommand containing a document that appears to still contain Atomic document update operations, most likely because DistributedUpdateProcessorFactory was explicitly disabled from this updateRequestProcessorChain"); + public RunUpdateProcessor(SolrQueryRequest req, UpdateRequestProcessor next) { + super(next); + this.req = req; + this.updateHandler = req.getCore().getUpdateHandler(); } - updateHandler.addDoc(cmd); - super.processAdd(cmd); - changesSinceCommit = true; - } + @Override + public void processAdd(AddUpdateCommand cmd) throws IOException { - @Override - public void processDelete(DeleteUpdateCommand cmd) throws IOException { - if( cmd.isDeleteById()) { - updateHandler.delete(cmd); + if (AtomicUpdateDocumentMerger.isAtomicUpdate(cmd)) { + throw new SolrException + (SolrException.ErrorCode.BAD_REQUEST, + "RunUpdateProcessor has received an AddUpdateCommand containing a document that appears to still contain Atomic document update operations, most likely because DistributedUpdateProcessorFactory was explicitly disabled from this updateRequestProcessorChain"); + } + + updateHandler.addDoc(cmd); + super.processAdd(cmd); + changesSinceCommit = true; } - else { - updateHandler.deleteByQuery(cmd); + + @Override + public void processDelete(DeleteUpdateCommand cmd) throws IOException { + if (cmd.isDeleteById()) { + updateHandler.delete(cmd); + } else { + updateHandler.deleteByQuery(cmd); + } + super.processDelete(cmd); + changesSinceCommit = true; } - super.processDelete(cmd); - changesSinceCommit = true; - } - @Override - public void processMergeIndexes(MergeIndexesCommand cmd) throws IOException { - updateHandler.mergeIndexes(cmd); - super.processMergeIndexes(cmd); - } + @Override + public void processMergeIndexes(MergeIndexesCommand cmd) throws IOException { + updateHandler.mergeIndexes(cmd); + super.processMergeIndexes(cmd); + } - @Override - public void processCommit(CommitUpdateCommand cmd) throws IOException - { - updateHandler.commit(cmd); - super.processCommit(cmd); - if (!cmd.softCommit) { - // a hard commit means we don't need to flush the transaction log + @Override + public void processCommit(CommitUpdateCommand cmd) throws IOException { + updateHandler.commit(cmd); + super.processCommit(cmd); + if (!cmd.softCommit) { + // a hard commit means we don't need to flush the transaction log + changesSinceCommit = false; + } + } + + /** + * @since Solr 1.4 + */ + @Override + public void processRollback(RollbackUpdateCommand cmd) throws IOException { + updateHandler.rollback(cmd); + super.processRollback(cmd); changesSinceCommit = false; } - } - - /** - * @since Solr 1.4 - */ - @Override - public void processRollback(RollbackUpdateCommand cmd) throws IOException - { - updateHandler.rollback(cmd); - super.processRollback(cmd); - changesSinceCommit = false; - } - @Override - public void finish() throws IOException { - if (changesSinceCommit && updateHandler.getUpdateLog() != null) { - updateHandler.getUpdateLog().finish(null); + @Override + public void finish() throws IOException { + if (changesSinceCommit && updateHandler.getUpdateLog() != null) { + updateHandler.getUpdateLog().finish(null); + } + super.finish(); } - super.finish(); } } diff --git a/solr/core/src/test/org/apache/solr/search/facet/TestJsonFacetRefinement.java b/solr/core/src/test/org/apache/solr/search/facet/TestJsonFacetRefinement.java index 2a5dcd16d48..522b22ca67d 100644 --- a/solr/core/src/test/org/apache/solr/search/facet/TestJsonFacetRefinement.java +++ b/solr/core/src/test/org/apache/solr/search/facet/TestJsonFacetRefinement.java @@ -136,7 +136,7 @@ public class TestJsonFacetRefinement extends SolrTestCaseHS { try { int nShards = responsesAndTests.length / 2; Object jsonFacet = Utils.fromJSONString(facet); - FacetParser parser = new FacetRequest.FacetTopParser(req); + FacetParser parser = new FacetParser.FacetTopParser(req); FacetRequest facetRequest = parser.parse(jsonFacet); FacetMerger merger = null; diff --git a/solr/core/src/test/org/apache/solr/update/processor/UpdateRequestProcessorFactoryTest.java b/solr/core/src/test/org/apache/solr/update/processor/UpdateRequestProcessorFactoryTest.java index 66d612fa697..cbd69203b30 100644 --- a/solr/core/src/test/org/apache/solr/update/processor/UpdateRequestProcessorFactoryTest.java +++ b/solr/core/src/test/org/apache/solr/update/processor/UpdateRequestProcessorFactoryTest.java @@ -162,7 +162,7 @@ public class UpdateRequestProcessorFactoryTest extends SolrTestCaseJ4 { // for these 3 (distrib) chains, the last proc should always be RunUpdateProcessor assertTrue(name + " (distrib) last processor isn't a RunUpdateProcessor: " + procs.toString(), - procs.get(procs.size()-1) instanceof RunUpdateProcessor ); + procs.get(procs.size()-1) instanceof RunUpdateProcessorFactory.RunUpdateProcessor ); // either 1 proc was droped in distrib mode, or 1 for the "implicit" chain diff --git a/solr/solrj/src/java/org/noggit/CharArr.java b/solr/solrj/src/java/org/noggit/CharArr.java index 9ecc8e62609..0431e107a1d 100644 --- a/solr/solrj/src/java/org/noggit/CharArr.java +++ b/solr/solrj/src/java/org/noggit/CharArr.java @@ -225,170 +225,170 @@ public class CharArr implements CharSequence, Appendable { write(c); return this; } -} -class NullCharArr extends CharArr { - public NullCharArr() { - super(new char[1], 0, 0); - } - - @Override - public void unsafeWrite(char b) { - } - - @Override - public void unsafeWrite(char b[], int off, int len) { - } - - @Override - public void unsafeWrite(int b) { - } - - @Override - public void write(char b) { - } - - @Override - public void write(char b[], int off, int len) { - } - - @Override - public void reserve(int num) { - } - - @Override - protected void resize(int len) { - } - - @Override - public Appendable append(CharSequence csq, int start, int end) throws IOException { - return this; - } - - @Override - public char charAt(int index) { - return 0; - } - - @Override - public void write(String s, int stringOffset, int len) { - } -} - - -// IDEA: a subclass that refills the array from a reader? -class CharArrReader extends CharArr { - protected final Reader in; - - public CharArrReader(Reader in, int size) { - super(size); - this.in = in; - } - - @Override - public int read() throws IOException { - if (start >= end) fill(); - return start >= end ? -1 : buf[start++]; - } - - @Override - public int read(CharBuffer cb) throws IOException { - // empty the buffer and then read direct - int sz = size(); - if (sz > 0) cb.put(buf, start, end); - int sz2 = in.read(cb); - if (sz2 >= 0) return sz + sz2; - return sz > 0 ? sz : -1; - } - - @Override - public int fill() throws IOException { - if (start >= end) { - reset(); - } else if (start > 0) { - System.arraycopy(buf, start, buf, 0, size()); - end = size(); - start = 0; + static class NullCharArr extends CharArr { + public NullCharArr() { + super(new char[1], 0, 0); } - /*** - // fill fully or not??? - do { - int sz = in.read(buf,end,buf.length-end); - if (sz==-1) return; - end+=sz; - } while (end < buf.length); - ***/ - int sz = in.read(buf, end, buf.length - end); - if (sz > 0) end += sz; - return sz; - } - -} - - -class CharArrWriter extends CharArr { - protected Writer sink; - - @Override - public void flush() { - try { - sink.write(buf, start, end - start); - } catch (IOException e) { - throw new RuntimeException(e); + @Override + public void unsafeWrite(char b) { } - start = end = 0; - } - @Override - public void write(char b) { - if (end >= buf.length) { - flush(); + @Override + public void unsafeWrite(char b[], int off, int len) { + } + + @Override + public void unsafeWrite(int b) { + } + + @Override + public void write(char b) { + } + + @Override + public void write(char b[], int off, int len) { + } + + @Override + public void reserve(int num) { + } + + @Override + protected void resize(int len) { + } + + @Override + public Appendable append(CharSequence csq, int start, int end) throws IOException { + return this; + } + + @Override + public char charAt(int index) { + return 0; + } + + @Override + public void write(String s, int stringOffset, int len) { } - unsafeWrite(b); } - @Override - public void write(char b[], int off, int len) { - int space = buf.length - end; - if (len < space) { - unsafeWrite(b, off, len); - } else if (len < buf.length) { - unsafeWrite(b, off, space); - flush(); - unsafeWrite(b, off + space, len - space); - } else { - flush(); + + // IDEA: a subclass that refills the array from a reader? + class CharArrReader extends CharArr { + protected final Reader in; + + public CharArrReader(Reader in, int size) { + super(size); + this.in = in; + } + + @Override + public int read() throws IOException { + if (start >= end) fill(); + return start >= end ? -1 : buf[start++]; + } + + @Override + public int read(CharBuffer cb) throws IOException { + // empty the buffer and then read direct + int sz = size(); + if (sz > 0) cb.put(buf, start, end); + int sz2 = in.read(cb); + if (sz2 >= 0) return sz + sz2; + return sz > 0 ? sz : -1; + } + + @Override + public int fill() throws IOException { + if (start >= end) { + reset(); + } else if (start > 0) { + System.arraycopy(buf, start, buf, 0, size()); + end = size(); + start = 0; + } + /*** + // fill fully or not??? + do { + int sz = in.read(buf,end,buf.length-end); + if (sz==-1) return; + end+=sz; + } while (end < buf.length); + ***/ + + int sz = in.read(buf, end, buf.length - end); + if (sz > 0) end += sz; + return sz; + } + + } + + + class CharArrWriter extends CharArr { + protected Writer sink; + + @Override + public void flush() { try { - sink.write(b, off, len); + sink.write(buf, start, end - start); } catch (IOException e) { throw new RuntimeException(e); } + start = end = 0; } - } - @Override - public void write(String s, int stringOffset, int len) { - int space = buf.length - end; - if (len < space) { - s.getChars(stringOffset, stringOffset + len, buf, end); - end += len; - } else if (len < buf.length) { - // if the data to write is small enough, buffer it. - s.getChars(stringOffset, stringOffset + space, buf, end); - flush(); - s.getChars(stringOffset + space, stringOffset + len, buf, 0); - end = len - space; - } else { - flush(); - // don't buffer, just write to sink - try { - sink.write(s, stringOffset, len); - } catch (IOException e) { - throw new RuntimeException(e); + @Override + public void write(char b) { + if (end >= buf.length) { + flush(); } + unsafeWrite(b); + } + @Override + public void write(char b[], int off, int len) { + int space = buf.length - end; + if (len < space) { + unsafeWrite(b, off, len); + } else if (len < buf.length) { + unsafeWrite(b, off, space); + flush(); + unsafeWrite(b, off + space, len - space); + } else { + flush(); + try { + sink.write(b, off, len); + } catch (IOException e) { + throw new RuntimeException(e); + } + } + } + + @Override + public void write(String s, int stringOffset, int len) { + int space = buf.length - end; + if (len < space) { + s.getChars(stringOffset, stringOffset + len, buf, end); + end += len; + } else if (len < buf.length) { + // if the data to write is small enough, buffer it. + s.getChars(stringOffset, stringOffset + space, buf, end); + flush(); + s.getChars(stringOffset + space, stringOffset + len, buf, 0); + end = len - space; + } else { + flush(); + // don't buffer, just write to sink + try { + sink.write(s, stringOffset, len); + } catch (IOException e) { + throw new RuntimeException(e); + } + + } } } } diff --git a/solr/solrj/src/java/org/noggit/JSONParser.java b/solr/solrj/src/java/org/noggit/JSONParser.java index 8b1ac01bc72..d1655d13e70 100644 --- a/solr/solrj/src/java/org/noggit/JSONParser.java +++ b/solr/solrj/src/java/org/noggit/JSONParser.java @@ -132,7 +132,7 @@ public class JSONParser { return "Unknown: " + e; } - private static final CharArr devNull = new NullCharArr(); + private static final CharArr devNull = new CharArr.NullCharArr(); protected int flags = FLAGS_DEFAULT;