HADOOP-17099. Replace Guava Predicate with Java8+ Predicate
Signed-off-by: Jonathan Eagles <jeagles@gmail.com>
(cherry picked from commit 1f71c4ae71
)
This commit is contained in:
parent
34a6cbb5bb
commit
b7b9bd32db
|
@ -123,7 +123,7 @@
|
|||
<property name="regexp" value="true"/>
|
||||
<property name="illegalPkgs" value="^sun\.[^.]+"/>
|
||||
<property name="illegalClasses"
|
||||
value="^com\.google\.common\.base\.(Optional|Function), ^com\.google\.common\.collect\.(ImmutableListMultimap)"/>
|
||||
value="^com\.google\.common\.base\.(Optional|Function|Predicate), ^com\.google\.common\.collect\.(ImmutableListMultimap)"/>
|
||||
</module>
|
||||
<module name="RedundantImport"/>
|
||||
<module name="UnusedImports"/>
|
||||
|
|
|
@ -18,8 +18,8 @@
|
|||
|
||||
package org.apache.hadoop.metrics2.impl;
|
||||
|
||||
import com.google.common.base.Predicate;
|
||||
import com.google.common.collect.Iterables;
|
||||
import java.util.function.Predicate;
|
||||
import java.util.stream.StreamSupport;
|
||||
import org.apache.hadoop.metrics2.AbstractMetric;
|
||||
import org.apache.hadoop.metrics2.MetricsRecord;
|
||||
import org.apache.hadoop.metrics2.MetricsTag;
|
||||
|
@ -65,16 +65,22 @@ public class MetricsRecords {
|
|||
resourceLimitMetric);
|
||||
}
|
||||
|
||||
private static MetricsTag getFirstTagByName(MetricsRecord record, String name) {
|
||||
return Iterables.getFirst(Iterables.filter(record.tags(),
|
||||
new MetricsTagPredicate(name)), null);
|
||||
private static MetricsTag getFirstTagByName(MetricsRecord record,
|
||||
String name) {
|
||||
if (record.tags() == null) {
|
||||
return null;
|
||||
}
|
||||
return record.tags().stream().filter(
|
||||
new MetricsTagPredicate(name)).findFirst().orElse(null);
|
||||
}
|
||||
|
||||
private static AbstractMetric getFirstMetricByName(
|
||||
MetricsRecord record, String name) {
|
||||
return Iterables.getFirst(
|
||||
Iterables.filter(record.metrics(), new AbstractMetricPredicate(name)),
|
||||
null);
|
||||
if (record.metrics() == null) {
|
||||
return null;
|
||||
}
|
||||
return StreamSupport.stream(record.metrics().spliterator(), false)
|
||||
.filter(new AbstractMetricPredicate(name)).findFirst().orElse(null);
|
||||
}
|
||||
|
||||
private static class MetricsTagPredicate implements Predicate<MetricsTag> {
|
||||
|
@ -86,7 +92,7 @@ public class MetricsRecords {
|
|||
}
|
||||
|
||||
@Override
|
||||
public boolean apply(MetricsTag input) {
|
||||
public boolean test(MetricsTag input) {
|
||||
return input.name().equals(tagName);
|
||||
}
|
||||
}
|
||||
|
@ -101,7 +107,7 @@ public class MetricsRecords {
|
|||
}
|
||||
|
||||
@Override
|
||||
public boolean apply(AbstractMetric input) {
|
||||
public boolean test(AbstractMetric input) {
|
||||
return input.name().equals(metricName);
|
||||
}
|
||||
}
|
||||
|
|
|
@ -23,9 +23,7 @@ import java.io.IOException;
|
|||
import java.util.*;
|
||||
import java.util.concurrent.*;
|
||||
import java.util.concurrent.atomic.*;
|
||||
|
||||
import javax.annotation.Nullable;
|
||||
|
||||
import java.util.stream.StreamSupport;
|
||||
import org.junit.Test;
|
||||
import org.junit.runner.RunWith;
|
||||
|
||||
|
@ -38,7 +36,6 @@ import org.mockito.stubbing.Answer;
|
|||
import static org.junit.Assert.*;
|
||||
import static org.mockito.Mockito.*;
|
||||
|
||||
import com.google.common.base.Predicate;
|
||||
import com.google.common.base.Supplier;
|
||||
import com.google.common.collect.Iterables;
|
||||
|
||||
|
@ -59,7 +56,6 @@ import org.apache.hadoop.metrics2.lib.MetricsRegistry;
|
|||
import org.apache.hadoop.metrics2.lib.MutableCounterLong;
|
||||
import org.apache.hadoop.metrics2.lib.MutableRate;
|
||||
import org.apache.hadoop.metrics2.lib.MutableGaugeLong;
|
||||
import org.apache.hadoop.util.StringUtils;
|
||||
import org.apache.hadoop.metrics2.lib.DefaultMetricsSystem;
|
||||
import org.slf4j.Logger;
|
||||
import org.slf4j.LoggerFactory;
|
||||
|
@ -246,13 +242,9 @@ public class TestMetricsSystemImpl {
|
|||
for (Thread t : threads)
|
||||
t.join();
|
||||
assertEquals(0L, ms.droppedPubAll.value());
|
||||
assertTrue(StringUtils.join("\n", Arrays.asList(results)),
|
||||
Iterables.all(Arrays.asList(results), new Predicate<String>() {
|
||||
@Override
|
||||
public boolean apply(@Nullable String input) {
|
||||
return input.equalsIgnoreCase("Passed");
|
||||
}
|
||||
}));
|
||||
assertTrue(String.join("\n", Arrays.asList(results)),
|
||||
Arrays.asList(results).stream().allMatch(
|
||||
input -> input.equalsIgnoreCase("Passed")));
|
||||
ms.stop();
|
||||
ms.shutdown();
|
||||
}
|
||||
|
@ -482,14 +474,12 @@ public class TestMetricsSystemImpl {
|
|||
ms.onTimerEvent();
|
||||
verify(dataSink, timeout(500).times(2)).putMetrics(r1.capture());
|
||||
List<MetricsRecord> mr = r1.getAllValues();
|
||||
Number qSize = Iterables.find(mr.get(1).metrics(),
|
||||
new Predicate<AbstractMetric>() {
|
||||
@Override
|
||||
public boolean apply(@Nullable AbstractMetric input) {
|
||||
Number qSize = StreamSupport.stream(mr.get(1).metrics().spliterator(),
|
||||
false).filter(
|
||||
input -> {
|
||||
assert input != null;
|
||||
return input.name().equals("Sink_slowSinkQsize");
|
||||
}
|
||||
}).value();
|
||||
}).findFirst().get().value();
|
||||
assertEquals(1, qSize);
|
||||
} finally {
|
||||
proceedSignal.countDown();
|
||||
|
|
|
@ -21,9 +21,8 @@ import com.google.common.annotations.VisibleForTesting;
|
|||
import com.google.common.collect.HashMultimap;
|
||||
import com.google.common.collect.Multimap;
|
||||
import com.google.common.collect.UnmodifiableIterator;
|
||||
import com.google.common.collect.Iterables;
|
||||
import com.google.common.collect.Collections2;
|
||||
|
||||
import java.util.stream.Collectors;
|
||||
import org.slf4j.Logger;
|
||||
import org.slf4j.LoggerFactory;
|
||||
import org.apache.hadoop.conf.Configuration;
|
||||
|
@ -40,7 +39,7 @@ import java.util.Collection;
|
|||
import java.util.Iterator;
|
||||
import java.util.Map;
|
||||
|
||||
import com.google.common.base.Predicate;
|
||||
|
||||
|
||||
import org.apache.hadoop.hdfs.util.CombinedHostsFileReader;
|
||||
|
||||
|
@ -84,37 +83,26 @@ public class CombinedHostFileManager extends HostConfigManager {
|
|||
// If the includes list is empty, act as if everything is in the
|
||||
// includes list.
|
||||
synchronized boolean isIncluded(final InetSocketAddress address) {
|
||||
return emptyInServiceNodeLists || Iterables.any(
|
||||
allDNs.get(address.getAddress()),
|
||||
new Predicate<DatanodeAdminProperties>() {
|
||||
public boolean apply(DatanodeAdminProperties input) {
|
||||
return input.getPort() == 0 ||
|
||||
input.getPort() == address.getPort();
|
||||
}
|
||||
});
|
||||
return emptyInServiceNodeLists || allDNs.get(address.getAddress())
|
||||
.stream().anyMatch(
|
||||
input -> input.getPort() == 0 ||
|
||||
input.getPort() == address.getPort());
|
||||
}
|
||||
|
||||
synchronized boolean isExcluded(final InetSocketAddress address) {
|
||||
return Iterables.any(allDNs.get(address.getAddress()),
|
||||
new Predicate<DatanodeAdminProperties>() {
|
||||
public boolean apply(DatanodeAdminProperties input) {
|
||||
return input.getAdminState().equals(
|
||||
return allDNs.get(address.getAddress()).stream().anyMatch(
|
||||
input -> input.getAdminState().equals(
|
||||
AdminStates.DECOMMISSIONED) &&
|
||||
(input.getPort() == 0 ||
|
||||
input.getPort() == address.getPort());
|
||||
}
|
||||
});
|
||||
input.getPort() == address.getPort()));
|
||||
}
|
||||
|
||||
synchronized String getUpgradeDomain(final InetSocketAddress address) {
|
||||
Iterable<DatanodeAdminProperties> datanode = Iterables.filter(
|
||||
allDNs.get(address.getAddress()),
|
||||
new Predicate<DatanodeAdminProperties>() {
|
||||
public boolean apply(DatanodeAdminProperties input) {
|
||||
return (input.getPort() == 0 ||
|
||||
input.getPort() == address.getPort());
|
||||
}
|
||||
});
|
||||
Iterable<DatanodeAdminProperties> datanode =
|
||||
allDNs.get(address.getAddress()).stream().filter(
|
||||
input -> (input.getPort() == 0 ||
|
||||
input.getPort() == address.getPort())).collect(
|
||||
Collectors.toList());
|
||||
return datanode.iterator().hasNext() ?
|
||||
datanode.iterator().next().getUpgradeDomain() : null;
|
||||
}
|
||||
|
@ -129,36 +117,22 @@ public class CombinedHostFileManager extends HostConfigManager {
|
|||
}
|
||||
|
||||
Iterable<InetSocketAddress> getExcludes() {
|
||||
return new Iterable<InetSocketAddress>() {
|
||||
@Override
|
||||
public Iterator<InetSocketAddress> iterator() {
|
||||
return new HostIterator(
|
||||
Collections2.filter(allDNs.entries(),
|
||||
new Predicate<java.util.Map.Entry<InetAddress,
|
||||
DatanodeAdminProperties>>() {
|
||||
public boolean apply(java.util.Map.Entry<InetAddress,
|
||||
DatanodeAdminProperties> entry) {
|
||||
return entry.getValue().getAdminState().equals(
|
||||
AdminStates.DECOMMISSIONED);
|
||||
}
|
||||
}
|
||||
));
|
||||
}
|
||||
};
|
||||
return () -> new HostIterator(
|
||||
allDNs.entries().stream().filter(
|
||||
entry -> entry.getValue().getAdminState().equals(
|
||||
AdminStates.DECOMMISSIONED)).collect(
|
||||
Collectors.toList()));
|
||||
}
|
||||
|
||||
synchronized long getMaintenanceExpireTimeInMS(
|
||||
final InetSocketAddress address) {
|
||||
Iterable<DatanodeAdminProperties> datanode = Iterables.filter(
|
||||
allDNs.get(address.getAddress()),
|
||||
new Predicate<DatanodeAdminProperties>() {
|
||||
public boolean apply(DatanodeAdminProperties input) {
|
||||
return input.getAdminState().equals(
|
||||
Iterable<DatanodeAdminProperties> datanode =
|
||||
allDNs.get(address.getAddress()).stream().filter(
|
||||
input -> input.getAdminState().equals(
|
||||
AdminStates.IN_MAINTENANCE) &&
|
||||
(input.getPort() == 0 ||
|
||||
input.getPort() == address.getPort());
|
||||
}
|
||||
});
|
||||
input.getPort() == address.getPort())).collect(
|
||||
Collectors.toList());
|
||||
// if DN isn't set to maintenance state, ignore MaintenanceExpireTimeInMS
|
||||
// set in the config.
|
||||
return datanode.iterator().hasNext() ?
|
||||
|
|
|
@ -27,6 +27,7 @@ import java.util.Map;
|
|||
|
||||
import org.apache.commons.logging.Log;
|
||||
import org.apache.commons.logging.LogFactory;
|
||||
import java.util.stream.Collectors;
|
||||
import org.apache.hadoop.classification.InterfaceAudience;
|
||||
import org.apache.hadoop.conf.Configuration;
|
||||
import org.apache.hadoop.fs.DF;
|
||||
|
@ -34,8 +35,6 @@ import org.apache.hadoop.hdfs.DFSConfigKeys;
|
|||
import org.apache.hadoop.hdfs.server.common.Util;
|
||||
|
||||
import com.google.common.annotations.VisibleForTesting;
|
||||
import com.google.common.collect.Collections2;
|
||||
import com.google.common.base.Predicate;
|
||||
|
||||
/**
|
||||
*
|
||||
|
@ -116,17 +115,14 @@ public class NameNodeResourceChecker {
|
|||
Collection<URI> extraCheckedVolumes = Util.stringCollectionAsURIs(conf
|
||||
.getTrimmedStringCollection(DFSConfigKeys.DFS_NAMENODE_CHECKED_VOLUMES_KEY));
|
||||
|
||||
Collection<URI> localEditDirs = Collections2.filter(
|
||||
FSNamesystem.getNamespaceEditsDirs(conf),
|
||||
new Predicate<URI>() {
|
||||
@Override
|
||||
public boolean apply(URI input) {
|
||||
Collection<URI> localEditDirs =
|
||||
FSNamesystem.getNamespaceEditsDirs(conf).stream().filter(
|
||||
input -> {
|
||||
if (input.getScheme().equals(NNStorage.LOCAL_URI_SCHEME)) {
|
||||
return true;
|
||||
}
|
||||
return false;
|
||||
}
|
||||
});
|
||||
}).collect(Collectors.toList());
|
||||
|
||||
// Add all the local edits dirs, marking some as required if they are
|
||||
// configured as such.
|
||||
|
|
|
@ -24,7 +24,7 @@ import java.text.SimpleDateFormat;
|
|||
import java.util.Arrays;
|
||||
import java.util.Comparator;
|
||||
import java.util.Date;
|
||||
|
||||
import java.util.stream.Collectors;
|
||||
import org.apache.hadoop.classification.InterfaceAudience;
|
||||
import org.apache.hadoop.fs.Path;
|
||||
import org.apache.hadoop.hdfs.DFSUtil;
|
||||
|
@ -38,9 +38,6 @@ import org.apache.hadoop.hdfs.server.namenode.INodeDirectory;
|
|||
import org.apache.hadoop.hdfs.server.namenode.XAttrFeature;
|
||||
import org.apache.hadoop.hdfs.util.ReadOnlyList;
|
||||
|
||||
import com.google.common.base.Predicate;
|
||||
import com.google.common.collect.Iterables;
|
||||
import com.google.common.collect.Lists;
|
||||
import org.apache.hadoop.security.AccessControlException;
|
||||
|
||||
/** Snapshot of a sub-tree in the namesystem. */
|
||||
|
@ -149,20 +146,14 @@ public class Snapshot implements Comparable<byte[]> {
|
|||
static public class Root extends INodeDirectory {
|
||||
Root(INodeDirectory other) {
|
||||
// Always preserve ACL, XAttr.
|
||||
super(other, false, Lists.newArrayList(
|
||||
Iterables.filter(Arrays.asList(other.getFeatures()), new Predicate<Feature>() {
|
||||
|
||||
@Override
|
||||
public boolean apply(Feature input) {
|
||||
super(other, false, Arrays.asList(other.getFeatures()).stream().filter(
|
||||
input -> {
|
||||
if (AclFeature.class.isInstance(input)
|
||||
|| XAttrFeature.class.isInstance(input)) {
|
||||
return true;
|
||||
}
|
||||
return false;
|
||||
}
|
||||
|
||||
}))
|
||||
.toArray(new Feature[0]));
|
||||
}).collect(Collectors.toList()).toArray(new Feature[0]));
|
||||
}
|
||||
|
||||
@Override
|
||||
|
|
|
@ -72,7 +72,6 @@ import org.apache.hadoop.yarn.util.Times;
|
|||
import org.slf4j.Logger;
|
||||
import org.slf4j.LoggerFactory;
|
||||
import com.google.common.annotations.VisibleForTesting;
|
||||
import com.google.common.base.Predicate;
|
||||
import com.google.common.collect.Iterables;
|
||||
import com.google.common.collect.Sets;
|
||||
|
||||
|
@ -353,14 +352,9 @@ public class AggregatedLogFormat {
|
|||
: this.logAggregationContext.getRolledLogsExcludePattern(),
|
||||
candidates, true);
|
||||
|
||||
Iterable<File> mask =
|
||||
Iterables.filter(candidates, new Predicate<File>() {
|
||||
@Override
|
||||
public boolean apply(File next) {
|
||||
return !alreadyUploadedLogFiles
|
||||
.contains(getLogFileMetaData(next));
|
||||
}
|
||||
});
|
||||
Iterable<File> mask = Iterables.filter(candidates, (input) ->
|
||||
!alreadyUploadedLogFiles
|
||||
.contains(getLogFileMetaData(input)));
|
||||
return Sets.newHashSet(mask);
|
||||
}
|
||||
|
||||
|
|
|
@ -19,9 +19,7 @@
|
|||
package org.apache.hadoop.yarn.logaggregation.filecontroller;
|
||||
|
||||
import com.google.common.annotations.VisibleForTesting;
|
||||
import com.google.common.base.Predicate;
|
||||
import com.google.common.collect.Iterables;
|
||||
import com.google.common.collect.Sets;
|
||||
|
||||
import java.io.FileNotFoundException;
|
||||
import java.io.IOException;
|
||||
import java.io.OutputStream;
|
||||
|
@ -36,6 +34,7 @@ import java.util.Map;
|
|||
import java.util.Set;
|
||||
import org.apache.commons.lang.StringUtils;
|
||||
import org.apache.hadoop.classification.InterfaceAudience.Private;
|
||||
import java.util.stream.Collectors;
|
||||
import org.apache.hadoop.classification.InterfaceStability.Unstable;
|
||||
import org.apache.hadoop.conf.Configuration;
|
||||
import org.apache.hadoop.fs.FileStatus;
|
||||
|
@ -482,17 +481,12 @@ public abstract class LogAggregationFileController {
|
|||
Set<FileStatus> status =
|
||||
new HashSet<FileStatus>(Arrays.asList(remoteFS.listStatus(appDir)));
|
||||
|
||||
Iterable<FileStatus> mask =
|
||||
Iterables.filter(status, new Predicate<FileStatus>() {
|
||||
@Override
|
||||
public boolean apply(FileStatus next) {
|
||||
return next.getPath().getName()
|
||||
status = status.stream().filter(
|
||||
next -> next.getPath().getName()
|
||||
.contains(LogAggregationUtils.getNodeString(nodeId))
|
||||
&& !next.getPath().getName().endsWith(
|
||||
LogAggregationUtils.TMP_FILE_SUFFIX);
|
||||
}
|
||||
});
|
||||
status = Sets.newHashSet(mask);
|
||||
LogAggregationUtils.TMP_FILE_SUFFIX)).collect(
|
||||
Collectors.toSet());
|
||||
// Normally, we just need to delete one oldest log
|
||||
// before we upload a new log.
|
||||
// If we can not delete the older logs in this cycle,
|
||||
|
|
|
@ -19,9 +19,7 @@
|
|||
package org.apache.hadoop.yarn.logaggregation.filecontroller.ifile;
|
||||
|
||||
import com.google.common.annotations.VisibleForTesting;
|
||||
import com.google.common.base.Predicate;
|
||||
import com.google.common.collect.Iterables;
|
||||
import com.google.common.collect.Sets;
|
||||
|
||||
import java.io.File;
|
||||
import java.io.FileInputStream;
|
||||
import java.io.IOException;
|
||||
|
@ -44,6 +42,7 @@ import java.util.Map;
|
|||
import java.util.Map.Entry;
|
||||
import java.util.Set;
|
||||
import org.apache.commons.lang.SerializationUtils;
|
||||
import java.util.stream.Collectors;
|
||||
import org.apache.hadoop.classification.InterfaceAudience.Private;
|
||||
import org.apache.hadoop.classification.InterfaceStability.Unstable;
|
||||
import org.apache.hadoop.conf.Configuration;
|
||||
|
@ -670,16 +669,12 @@ public class LogAggregationIndexedFileController
|
|||
public Map<String, Long> parseCheckSumFiles(
|
||||
List<FileStatus> fileList) throws IOException {
|
||||
Map<String, Long> checkSumFiles = new HashMap<>();
|
||||
Set<FileStatus> status = new HashSet<FileStatus>(fileList);
|
||||
Iterable<FileStatus> mask =
|
||||
Iterables.filter(status, new Predicate<FileStatus>() {
|
||||
@Override
|
||||
public boolean apply(FileStatus next) {
|
||||
return next.getPath().getName().endsWith(
|
||||
CHECK_SUM_FILE_SUFFIX);
|
||||
}
|
||||
});
|
||||
status = Sets.newHashSet(mask);
|
||||
Set<FileStatus> status =
|
||||
new HashSet<>(fileList).stream().filter(
|
||||
next -> next.getPath().getName().endsWith(
|
||||
CHECK_SUM_FILE_SUFFIX)).collect(
|
||||
Collectors.toSet());
|
||||
|
||||
FileContext fc = null;
|
||||
for (FileStatus file : status) {
|
||||
FSDataInputStream checksumFileInputStream = null;
|
||||
|
|
|
@ -28,6 +28,7 @@ import java.util.Set;
|
|||
import java.util.concurrent.BlockingQueue;
|
||||
import java.util.concurrent.LinkedBlockingQueue;
|
||||
import java.util.concurrent.atomic.AtomicBoolean;
|
||||
import java.util.stream.Collectors;
|
||||
import org.slf4j.Logger;
|
||||
import org.slf4j.LoggerFactory;
|
||||
|
||||
|
@ -71,8 +72,6 @@ import org.apache.hadoop.yarn.util.Records;
|
|||
import org.apache.hadoop.yarn.util.Times;
|
||||
|
||||
import com.google.common.annotations.VisibleForTesting;
|
||||
import com.google.common.base.Predicate;
|
||||
import com.google.common.collect.Iterables;
|
||||
import com.google.common.collect.Sets;
|
||||
|
||||
|
||||
|
@ -619,16 +618,9 @@ public class AppLogAggregatorImpl implements AppLogAggregator {
|
|||
.getCurrentUpLoadedFileMeta());
|
||||
// if any of the previous uploaded logs have been deleted,
|
||||
// we need to remove them from alreadyUploadedLogs
|
||||
Iterable<String> mask =
|
||||
Iterables.filter(uploadedFileMeta, new Predicate<String>() {
|
||||
@Override
|
||||
public boolean apply(String next) {
|
||||
return logValue.getAllExistingFilesMeta().contains(next);
|
||||
}
|
||||
});
|
||||
|
||||
this.uploadedFileMeta = Sets.newHashSet(mask);
|
||||
|
||||
this.uploadedFileMeta = uploadedFileMeta.stream().filter(
|
||||
next -> logValue.getAllExistingFilesMeta().contains(next)).collect(
|
||||
Collectors.toSet());
|
||||
// need to return files uploaded or older-than-retention clean up.
|
||||
return Sets.union(logValue.getCurrentUpLoadedFilesPath(),
|
||||
logValue.getObsoleteRetentionLogFiles());
|
||||
|
|
Loading…
Reference in New Issue