Remove unnecessary callback interface

We have a callback interface that is not needed because it is
effectively the same as java.util.function.Consumer. This commit removes
it.

Relates #25089
This commit is contained in:
Jason Tedor 2017-06-06 20:50:03 -04:00 committed by GitHub
parent feca0a9f33
commit 2f5f27fafa
11 changed files with 29 additions and 73 deletions

View File

@ -23,7 +23,6 @@ import org.apache.lucene.util.IOUtils;
import org.elasticsearch.common.bytes.BytesReference;
import org.elasticsearch.common.io.stream.BytesStream;
import org.elasticsearch.common.io.stream.StreamOutput;
import org.elasticsearch.common.util.Callback;
import java.io.BufferedReader;
import java.io.IOException;
@ -37,6 +36,7 @@ import java.nio.charset.StandardCharsets;
import java.util.ArrayList;
import java.util.List;
import java.util.Objects;
import java.util.function.Consumer;
/**
* Simple utility methods for file and stream copying.
@ -222,20 +222,15 @@ public abstract class Streams {
public static List<String> readAllLines(InputStream input) throws IOException {
final List<String> lines = new ArrayList<>();
readAllLines(input, new Callback<String>() {
@Override
public void handle(String line) {
lines.add(line);
}
});
readAllLines(input, lines::add);
return lines;
}
public static void readAllLines(InputStream input, Callback<String> callback) throws IOException {
public static void readAllLines(InputStream input, Consumer<String> consumer) throws IOException {
try (BufferedReader reader = new BufferedReader(new InputStreamReader(input, StandardCharsets.UTF_8))) {
String line;
while ((line = reader.readLine()) != null) {
callback.handle(line);
consumer.accept(line);
}
}
}

View File

@ -1,26 +0,0 @@
/*
* Licensed to Elasticsearch under one or more contributor
* license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright
* ownership. Elasticsearch licenses this file to you under
* the Apache License, Version 2.0 (the "License"); you may
* not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.elasticsearch.common.util;
public interface Callback<T> {
void handle(T t);
}

View File

@ -81,7 +81,6 @@ import java.util.Set;
import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.function.Consumer;
import java.util.function.LongSupplier;
import java.util.function.Supplier;
@ -514,7 +513,7 @@ public class IndexService extends AbstractIndexComponent implements IndicesClust
}
@Override
public void handle(ShardLock lock) {
public void accept(ShardLock lock) {
try {
assert lock.getShardId().equals(shardId) : "shard id mismatch, expected: " + shardId + " but got: " + lock.getShardId();
onShardClose(lock);

View File

@ -54,7 +54,6 @@ import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.unit.ByteSizeValue;
import org.elasticsearch.common.unit.TimeValue;
import org.elasticsearch.common.util.BigArrays;
import org.elasticsearch.common.util.Callback;
import org.elasticsearch.common.util.concurrent.AbstractRunnable;
import org.elasticsearch.common.util.concurrent.AsyncIOProcessor;
import org.elasticsearch.index.Index;
@ -1255,7 +1254,7 @@ public class IndexShard extends AbstractIndexShardComponent implements IndicesCl
}
}
public void addShardFailureCallback(Callback<ShardFailure> onShardFailure) {
public void addShardFailureCallback(Consumer<ShardFailure> onShardFailure) {
this.shardEventListener.delegates.add(onShardFailure);
}
@ -1767,15 +1766,15 @@ public class IndexShard extends AbstractIndexShardComponent implements IndicesCl
}
class ShardEventListener implements Engine.EventListener {
private final CopyOnWriteArrayList<Callback<ShardFailure>> delegates = new CopyOnWriteArrayList<>();
private final CopyOnWriteArrayList<Consumer<ShardFailure>> delegates = new CopyOnWriteArrayList<>();
// called by the current engine
@Override
public void onFailedEngine(String reason, @Nullable Exception failure) {
final ShardFailure shardFailure = new ShardFailure(shardRouting, reason, failure);
for (Callback<ShardFailure> listener : delegates) {
for (Consumer<ShardFailure> listener : delegates) {
try {
listener.handle(shardFailure);
listener.accept(shardFailure);
} catch (Exception inner) {
inner.addSuppressed(failure);
logger.warn("exception while notifying engine failure", inner);
@ -2051,7 +2050,7 @@ public class IndexShard extends AbstractIndexShardComponent implements IndicesCl
/**
* Simple struct encapsulating a shard failure
*
* @see IndexShard#addShardFailureCallback(Callback)
* @see IndexShard#addShardFailureCallback(Consumer)
*/
public static final class ShardFailure {
public final ShardRouting routing;

View File

@ -24,7 +24,6 @@ import org.apache.logging.log4j.message.ParameterizedMessage;
import org.apache.logging.log4j.util.Supplier;
import org.apache.lucene.codecs.CodecUtil;
import org.apache.lucene.index.CorruptIndexException;
import org.apache.lucene.index.DirectoryReader;
import org.apache.lucene.index.IndexCommit;
import org.apache.lucene.index.IndexFileNames;
import org.apache.lucene.index.IndexFormatTooNewException;
@ -62,11 +61,10 @@ import org.elasticsearch.common.logging.Loggers;
import org.elasticsearch.common.lucene.Lucene;
import org.elasticsearch.common.lucene.store.ByteArrayIndexInput;
import org.elasticsearch.common.lucene.store.InputStreamIndexInput;
import org.elasticsearch.common.settings.Setting.Property;
import org.elasticsearch.common.settings.Setting;
import org.elasticsearch.common.settings.Setting.Property;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.unit.TimeValue;
import org.elasticsearch.common.util.Callback;
import org.elasticsearch.common.util.SingleObjectCache;
import org.elasticsearch.common.util.concurrent.AbstractRefCounted;
import org.elasticsearch.common.util.concurrent.RefCounted;
@ -99,8 +97,8 @@ import java.util.List;
import java.util.Map;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.locks.ReentrantLock;
import java.util.concurrent.locks.ReentrantReadWriteLock;
import java.util.function.Consumer;
import java.util.zip.CRC32;
import java.util.zip.Checksum;
@ -412,7 +410,7 @@ public class Store extends AbstractIndexShardComponent implements Closeable, Ref
try {
directory.innerClose(); // this closes the distributorDirectory as well
} finally {
onClose.handle(shardLock);
onClose.accept(shardLock);
}
} catch (IOException e) {
logger.debug("failed to close directory", e);
@ -1371,14 +1369,14 @@ public class Store extends AbstractIndexShardComponent implements Closeable, Ref
/**
* A listener that is executed once the store is closed and all references to it are released
*/
public interface OnClose extends Callback<ShardLock> {
public interface OnClose extends Consumer<ShardLock> {
OnClose EMPTY = new OnClose() {
/**
* This method is called while the provided {@link org.elasticsearch.env.ShardLock} is held.
* This method is only called once after all resources for a store are released.
*/
@Override
public void handle(ShardLock Lock) {
public void accept(ShardLock Lock) {
}
};
}

View File

@ -55,7 +55,6 @@ import org.elasticsearch.common.io.stream.NamedWriteableRegistry;
import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.io.stream.StreamOutput;
import org.elasticsearch.common.lease.Releasable;
import org.elasticsearch.common.settings.ClusterSettings;
import org.elasticsearch.common.settings.IndexScopedSettings;
import org.elasticsearch.common.settings.Setting;
import org.elasticsearch.common.settings.Setting.Property;
@ -63,7 +62,6 @@ import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.unit.ByteSizeValue;
import org.elasticsearch.common.unit.TimeValue;
import org.elasticsearch.common.util.BigArrays;
import org.elasticsearch.common.util.Callback;
import org.elasticsearch.common.util.concurrent.EsExecutors;
import org.elasticsearch.common.util.iterable.Iterables;
import org.elasticsearch.common.xcontent.NamedXContentRegistry;
@ -491,7 +489,7 @@ public class IndicesService extends AbstractLifecycleComponent
@Override
public IndexShard createShard(ShardRouting shardRouting, RecoveryState recoveryState, PeerRecoveryTargetService recoveryTargetService,
PeerRecoveryTargetService.RecoveryListener recoveryListener, RepositoriesService repositoriesService,
Callback<IndexShard.ShardFailure> onShardFailure) throws IOException {
Consumer<IndexShard.ShardFailure> onShardFailure) throws IOException {
ensureChangesAllowed();
IndexService indexService = indexService(shardRouting.index());
IndexShard indexShard = indexService.createShard(shardRouting);

View File

@ -45,7 +45,6 @@ import org.elasticsearch.common.component.AbstractLifecycleComponent;
import org.elasticsearch.common.inject.Inject;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.unit.TimeValue;
import org.elasticsearch.common.util.Callback;
import org.elasticsearch.common.util.concurrent.AbstractRunnable;
import org.elasticsearch.common.util.concurrent.ConcurrentCollections;
import org.elasticsearch.env.ShardLockObtainFailedException;
@ -54,8 +53,8 @@ import org.elasticsearch.index.Index;
import org.elasticsearch.index.IndexComponent;
import org.elasticsearch.index.IndexService;
import org.elasticsearch.index.IndexSettings;
import org.elasticsearch.index.seqno.GlobalCheckpointTracker;
import org.elasticsearch.index.seqno.GlobalCheckpointSyncAction;
import org.elasticsearch.index.seqno.GlobalCheckpointTracker;
import org.elasticsearch.index.shard.IndexEventListener;
import org.elasticsearch.index.shard.IndexShard;
import org.elasticsearch.index.shard.IndexShardRelocatedException;
@ -694,9 +693,9 @@ public class IndicesClusterStateService extends AbstractLifecycleComponent imple
}
}
private class FailedShardHandler implements Callback<IndexShard.ShardFailure> {
private class FailedShardHandler implements Consumer<IndexShard.ShardFailure> {
@Override
public void handle(final IndexShard.ShardFailure shardFailure) {
public void accept(final IndexShard.ShardFailure shardFailure) {
final ShardRouting shardRouting = shardFailure.routing;
threadPool.generic().execute(() -> {
synchronized (IndicesClusterStateService.this) {
@ -832,7 +831,7 @@ public class IndicesClusterStateService extends AbstractLifecycleComponent imple
*/
T createShard(ShardRouting shardRouting, RecoveryState recoveryState, PeerRecoveryTargetService recoveryTargetService,
PeerRecoveryTargetService.RecoveryListener recoveryListener, RepositoriesService repositoriesService,
Callback<IndexShard.ShardFailure> onShardFailure) throws IOException;
Consumer<IndexShard.ShardFailure> onShardFailure) throws IOException;
/**
* Returns shard for the specified id if it exists otherwise returns <code>null</code>.

View File

@ -36,7 +36,6 @@ import org.elasticsearch.common.UUIDs;
import org.elasticsearch.common.bytes.BytesReference;
import org.elasticsearch.common.logging.Loggers;
import org.elasticsearch.common.lucene.Lucene;
import org.elasticsearch.common.util.Callback;
import org.elasticsearch.common.util.CancellableThreads;
import org.elasticsearch.common.util.concurrent.AbstractRefCounted;
import org.elasticsearch.common.util.concurrent.ConcurrentCollections;

View File

@ -27,7 +27,6 @@ import org.elasticsearch.cluster.routing.ShardRouting;
import org.elasticsearch.common.Nullable;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.unit.TimeValue;
import org.elasticsearch.common.util.Callback;
import org.elasticsearch.index.Index;
import org.elasticsearch.index.IndexService;
import org.elasticsearch.index.IndexSettings;
@ -52,6 +51,7 @@ import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.ConcurrentMap;
import java.util.function.Consumer;
import java.util.stream.Collectors;
import static java.util.Collections.emptyMap;
@ -226,7 +226,7 @@ public abstract class AbstractIndicesClusterStateServiceTestCase extends ESTestC
PeerRecoveryTargetService recoveryTargetService,
PeerRecoveryTargetService.RecoveryListener recoveryListener,
RepositoriesService repositoriesService,
Callback<IndexShard.ShardFailure> onShardFailure) throws IOException {
Consumer<IndexShard.ShardFailure> onShardFailure) throws IOException {
failRandomly();
MockIndexService indexService = indexService(recoveryState.getShardId().getIndex());
MockIndexShard indexShard = indexService.createShard(shardRouting);

View File

@ -31,7 +31,6 @@ import org.elasticsearch.common.Strings;
import org.elasticsearch.common.io.FileSystemUtils;
import org.elasticsearch.common.io.Streams;
import org.elasticsearch.common.logging.Loggers;
import org.elasticsearch.common.util.Callback;
import java.io.IOException;
import java.io.InputStream;

View File

@ -30,7 +30,6 @@ import com.google.api.services.storage.Storage;
import org.elasticsearch.common.Strings;
import org.elasticsearch.common.io.Streams;
import org.elasticsearch.common.path.PathTrie;
import org.elasticsearch.common.util.Callback;
import org.elasticsearch.common.util.concurrent.ConcurrentCollections;
import org.elasticsearch.common.xcontent.XContentBuilder;
import org.elasticsearch.rest.RestStatus;
@ -291,16 +290,13 @@ public class MockHttpTransport extends com.google.api.client.testing.http.MockHt
try (ByteArrayOutputStream os = new ByteArrayOutputStream((int) req.getContentLength())) {
req.getStreamingContent().writeTo(os);
Streams.readAllLines(new ByteArrayInputStream(os.toByteArray()), new Callback<String>() {
@Override
public void handle(String line) {
Handler handler = handlers.retrieve(line, params);
if (handler != null) {
try {
responses.add(handler.execute(line, params, req));
} catch (IOException e) {
responses.add(newMockError(RestStatus.INTERNAL_SERVER_ERROR, e.getMessage()));
}
Streams.readAllLines(new ByteArrayInputStream(os.toByteArray()), line -> {
Handler handler = handlers.retrieve(line, params);
if (handler != null) {
try {
responses.add(handler.execute(line, params, req));
} catch (IOException e) {
responses.add(newMockError(RestStatus.INTERNAL_SERVER_ERROR, e.getMessage()));
}
}
});