mirror of
https://github.com/honeymoose/OpenSearch.git
synced 2025-02-21 12:27:37 +00:00
parent
efc9e8fe7b
commit
271305d5eb
@ -146,7 +146,8 @@ public class SearchStats implements Streamable, ToXContent {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
private Stats totalStats;
|
Stats totalStats;
|
||||||
|
long openContexts;
|
||||||
|
|
||||||
@Nullable
|
@Nullable
|
||||||
Map<String, Stats> groupStats;
|
Map<String, Stats> groupStats;
|
||||||
@ -155,8 +156,9 @@ public class SearchStats implements Streamable, ToXContent {
|
|||||||
totalStats = new Stats();
|
totalStats = new Stats();
|
||||||
}
|
}
|
||||||
|
|
||||||
public SearchStats(Stats totalStats, @Nullable Map<String, Stats> groupStats) {
|
public SearchStats(Stats totalStats, long openContexts, @Nullable Map<String, Stats> groupStats) {
|
||||||
this.totalStats = totalStats;
|
this.totalStats = totalStats;
|
||||||
|
this.openContexts = openContexts;
|
||||||
this.groupStats = groupStats;
|
this.groupStats = groupStats;
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -169,6 +171,7 @@ public class SearchStats implements Streamable, ToXContent {
|
|||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
totalStats.add(searchStats.totalStats);
|
totalStats.add(searchStats.totalStats);
|
||||||
|
openContexts += searchStats.openContexts;
|
||||||
if (includeTypes && searchStats.groupStats != null && !searchStats.groupStats.isEmpty()) {
|
if (includeTypes && searchStats.groupStats != null && !searchStats.groupStats.isEmpty()) {
|
||||||
if (groupStats == null) {
|
if (groupStats == null) {
|
||||||
groupStats = new HashMap<String, Stats>(searchStats.groupStats.size());
|
groupStats = new HashMap<String, Stats>(searchStats.groupStats.size());
|
||||||
@ -188,6 +191,10 @@ public class SearchStats implements Streamable, ToXContent {
|
|||||||
return this.totalStats;
|
return this.totalStats;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
public long getOpenContexts() {
|
||||||
|
return this.openContexts;
|
||||||
|
}
|
||||||
|
|
||||||
@Nullable
|
@Nullable
|
||||||
public Map<String, Stats> getGroupStats() {
|
public Map<String, Stats> getGroupStats() {
|
||||||
return this.groupStats;
|
return this.groupStats;
|
||||||
@ -196,6 +203,7 @@ public class SearchStats implements Streamable, ToXContent {
|
|||||||
@Override
|
@Override
|
||||||
public XContentBuilder toXContent(XContentBuilder builder, ToXContent.Params params) throws IOException {
|
public XContentBuilder toXContent(XContentBuilder builder, ToXContent.Params params) throws IOException {
|
||||||
builder.startObject(Fields.SEARCH);
|
builder.startObject(Fields.SEARCH);
|
||||||
|
builder.field(Fields.OPEN, openContexts);
|
||||||
totalStats.toXContent(builder, params);
|
totalStats.toXContent(builder, params);
|
||||||
if (groupStats != null && !groupStats.isEmpty()) {
|
if (groupStats != null && !groupStats.isEmpty()) {
|
||||||
builder.startObject(Fields.GROUPS);
|
builder.startObject(Fields.GROUPS);
|
||||||
@ -212,6 +220,7 @@ public class SearchStats implements Streamable, ToXContent {
|
|||||||
|
|
||||||
static final class Fields {
|
static final class Fields {
|
||||||
static final XContentBuilderString SEARCH = new XContentBuilderString("search");
|
static final XContentBuilderString SEARCH = new XContentBuilderString("search");
|
||||||
|
static final XContentBuilderString OPEN = new XContentBuilderString("open");
|
||||||
static final XContentBuilderString GROUPS = new XContentBuilderString("groups");
|
static final XContentBuilderString GROUPS = new XContentBuilderString("groups");
|
||||||
static final XContentBuilderString QUERY_TOTAL = new XContentBuilderString("query_total");
|
static final XContentBuilderString QUERY_TOTAL = new XContentBuilderString("query_total");
|
||||||
static final XContentBuilderString QUERY_TIME = new XContentBuilderString("query_time");
|
static final XContentBuilderString QUERY_TIME = new XContentBuilderString("query_time");
|
||||||
@ -232,6 +241,7 @@ public class SearchStats implements Streamable, ToXContent {
|
|||||||
@Override
|
@Override
|
||||||
public void readFrom(StreamInput in) throws IOException {
|
public void readFrom(StreamInput in) throws IOException {
|
||||||
totalStats = Stats.readStats(in);
|
totalStats = Stats.readStats(in);
|
||||||
|
openContexts = in.readVLong();
|
||||||
if (in.readBoolean()) {
|
if (in.readBoolean()) {
|
||||||
int size = in.readVInt();
|
int size = in.readVInt();
|
||||||
groupStats = new HashMap<String, Stats>(size);
|
groupStats = new HashMap<String, Stats>(size);
|
||||||
@ -244,6 +254,7 @@ public class SearchStats implements Streamable, ToXContent {
|
|||||||
@Override
|
@Override
|
||||||
public void writeTo(StreamOutput out) throws IOException {
|
public void writeTo(StreamOutput out) throws IOException {
|
||||||
totalStats.writeTo(out);
|
totalStats.writeTo(out);
|
||||||
|
out.writeVLong(openContexts);
|
||||||
if (groupStats == null || groupStats.isEmpty()) {
|
if (groupStats == null || groupStats.isEmpty()) {
|
||||||
out.writeBoolean(false);
|
out.writeBoolean(false);
|
||||||
} else {
|
} else {
|
||||||
|
@ -42,6 +42,7 @@ public class ShardSearchService extends AbstractIndexShardComponent {
|
|||||||
private final ShardSlowLogSearchService slowLogSearchService;
|
private final ShardSlowLogSearchService slowLogSearchService;
|
||||||
|
|
||||||
private final StatsHolder totalStats = new StatsHolder();
|
private final StatsHolder totalStats = new StatsHolder();
|
||||||
|
private final CounterMetric openContexts = new CounterMetric();
|
||||||
|
|
||||||
private volatile Map<String, StatsHolder> groupsStats = ImmutableMap.of();
|
private volatile Map<String, StatsHolder> groupsStats = ImmutableMap.of();
|
||||||
|
|
||||||
@ -75,7 +76,7 @@ public class ShardSearchService extends AbstractIndexShardComponent {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
return new SearchStats(total, groupsSt);
|
return new SearchStats(total, openContexts.count(), groupsSt);
|
||||||
}
|
}
|
||||||
|
|
||||||
public void onPreQueryPhase(SearchContext searchContext) {
|
public void onPreQueryPhase(SearchContext searchContext) {
|
||||||
@ -171,6 +172,14 @@ public class ShardSearchService extends AbstractIndexShardComponent {
|
|||||||
return stats;
|
return stats;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
public void onNewContext(SearchContext context) {
|
||||||
|
openContexts.inc();
|
||||||
|
}
|
||||||
|
|
||||||
|
public void onFreeContext(SearchContext context) {
|
||||||
|
openContexts.dec();
|
||||||
|
}
|
||||||
|
|
||||||
static class StatsHolder {
|
static class StatsHolder {
|
||||||
public final MeanMetric queryMetric = new MeanMetric();
|
public final MeanMetric queryMetric = new MeanMetric();
|
||||||
public final MeanMetric fetchMetric = new MeanMetric();
|
public final MeanMetric fetchMetric = new MeanMetric();
|
||||||
|
@ -165,8 +165,7 @@ public class SearchService extends AbstractLifecycleComponent<SearchService> {
|
|||||||
}
|
}
|
||||||
|
|
||||||
public DfsSearchResult executeDfsPhase(ShardSearchRequest request) throws ElasticSearchException {
|
public DfsSearchResult executeDfsPhase(ShardSearchRequest request) throws ElasticSearchException {
|
||||||
SearchContext context = createContext(request);
|
SearchContext context = createAndPutContext(request);
|
||||||
activeContexts.put(context.id(), context);
|
|
||||||
try {
|
try {
|
||||||
contextProcessing(context);
|
contextProcessing(context);
|
||||||
dfsPhase.execute(context);
|
dfsPhase.execute(context);
|
||||||
@ -182,10 +181,9 @@ public class SearchService extends AbstractLifecycleComponent<SearchService> {
|
|||||||
}
|
}
|
||||||
|
|
||||||
public QuerySearchResult executeScan(ShardSearchRequest request) throws ElasticSearchException {
|
public QuerySearchResult executeScan(ShardSearchRequest request) throws ElasticSearchException {
|
||||||
SearchContext context = createContext(request);
|
SearchContext context = createAndPutContext(request);
|
||||||
assert context.searchType() == SearchType.SCAN;
|
assert context.searchType() == SearchType.SCAN;
|
||||||
context.searchType(SearchType.COUNT); // move to COUNT, and then, when scrolling, move to SCAN
|
context.searchType(SearchType.COUNT); // move to COUNT, and then, when scrolling, move to SCAN
|
||||||
activeContexts.put(context.id(), context);
|
|
||||||
assert context.searchType() == SearchType.COUNT;
|
assert context.searchType() == SearchType.COUNT;
|
||||||
try {
|
try {
|
||||||
if (context.scroll() == null) {
|
if (context.scroll() == null) {
|
||||||
@ -233,8 +231,7 @@ public class SearchService extends AbstractLifecycleComponent<SearchService> {
|
|||||||
}
|
}
|
||||||
|
|
||||||
public QuerySearchResult executeQueryPhase(ShardSearchRequest request) throws ElasticSearchException {
|
public QuerySearchResult executeQueryPhase(ShardSearchRequest request) throws ElasticSearchException {
|
||||||
SearchContext context = createContext(request);
|
SearchContext context = createAndPutContext(request);
|
||||||
activeContexts.put(context.id(), context);
|
|
||||||
try {
|
try {
|
||||||
context.indexShard().searchService().onPreQueryPhase(context);
|
context.indexShard().searchService().onPreQueryPhase(context);
|
||||||
long time = System.nanoTime();
|
long time = System.nanoTime();
|
||||||
@ -306,8 +303,7 @@ public class SearchService extends AbstractLifecycleComponent<SearchService> {
|
|||||||
}
|
}
|
||||||
|
|
||||||
public QueryFetchSearchResult executeFetchPhase(ShardSearchRequest request) throws ElasticSearchException {
|
public QueryFetchSearchResult executeFetchPhase(ShardSearchRequest request) throws ElasticSearchException {
|
||||||
SearchContext context = createContext(request);
|
SearchContext context = createAndPutContext(request);
|
||||||
activeContexts.put(context.id(), context);
|
|
||||||
contextProcessing(context);
|
contextProcessing(context);
|
||||||
try {
|
try {
|
||||||
context.indexShard().searchService().onPreQueryPhase(context);
|
context.indexShard().searchService().onPreQueryPhase(context);
|
||||||
@ -462,6 +458,13 @@ public class SearchService extends AbstractLifecycleComponent<SearchService> {
|
|||||||
return context;
|
return context;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
SearchContext createAndPutContext(ShardSearchRequest request) throws ElasticSearchException {
|
||||||
|
SearchContext context = createContext(request);
|
||||||
|
activeContexts.put(context.id(), context);
|
||||||
|
context.indexShard().searchService().onNewContext(context);
|
||||||
|
return context;
|
||||||
|
}
|
||||||
|
|
||||||
SearchContext createContext(ShardSearchRequest request) throws ElasticSearchException {
|
SearchContext createContext(ShardSearchRequest request) throws ElasticSearchException {
|
||||||
return createContext(request, null);
|
return createContext(request, null);
|
||||||
}
|
}
|
||||||
@ -517,7 +520,10 @@ public class SearchService extends AbstractLifecycleComponent<SearchService> {
|
|||||||
}
|
}
|
||||||
|
|
||||||
private void freeContext(SearchContext context) {
|
private void freeContext(SearchContext context) {
|
||||||
activeContexts.remove(context.id());
|
SearchContext removed = activeContexts.remove(context.id());
|
||||||
|
if (removed != null) {
|
||||||
|
removed.indexShard().searchService().onFreeContext(removed);
|
||||||
|
}
|
||||||
context.release();
|
context.release();
|
||||||
}
|
}
|
||||||
|
|
||||||
|
Loading…
x
Reference in New Issue
Block a user