mirror of https://github.com/apache/nifi.git
NIFI-11759: Remove Distributed Map Cache Client property from ListHDFS
This closes #7443 Signed-off-by: Mike Thomsen <mthomsen@apache.org>
This commit is contained in:
parent
12b7dd0243
commit
7b5853363f
|
@ -44,7 +44,6 @@ import org.apache.nifi.components.state.Scope;
|
||||||
import org.apache.nifi.components.state.StateMap;
|
import org.apache.nifi.components.state.StateMap;
|
||||||
import org.apache.nifi.deprecation.log.DeprecationLogger;
|
import org.apache.nifi.deprecation.log.DeprecationLogger;
|
||||||
import org.apache.nifi.deprecation.log.DeprecationLoggerFactory;
|
import org.apache.nifi.deprecation.log.DeprecationLoggerFactory;
|
||||||
import org.apache.nifi.distributed.cache.client.DistributedMapCacheClient;
|
|
||||||
import org.apache.nifi.flowfile.FlowFile;
|
import org.apache.nifi.flowfile.FlowFile;
|
||||||
import org.apache.nifi.flowfile.attributes.CoreAttributes;
|
import org.apache.nifi.flowfile.attributes.CoreAttributes;
|
||||||
import org.apache.nifi.processor.ProcessContext;
|
import org.apache.nifi.processor.ProcessContext;
|
||||||
|
@ -146,14 +145,6 @@ public class ListHDFS extends AbstractHadoopProcessor {
|
||||||
RECORD_SCHEMA = new SimpleRecordSchema(recordFields);
|
RECORD_SCHEMA = new SimpleRecordSchema(recordFields);
|
||||||
}
|
}
|
||||||
|
|
||||||
@Deprecated
|
|
||||||
public static final PropertyDescriptor DISTRIBUTED_CACHE_SERVICE = new PropertyDescriptor.Builder()
|
|
||||||
.name("Distributed Cache Service")
|
|
||||||
.description("This property is ignored. State will be stored in the " + Scope.LOCAL + " or " + Scope.CLUSTER + " scope by the State Manager based on NiFi's configuration.")
|
|
||||||
.required(false)
|
|
||||||
.identifiesControllerService(DistributedMapCacheClient.class)
|
|
||||||
.build();
|
|
||||||
|
|
||||||
public static final PropertyDescriptor RECURSE_SUBDIRS = new PropertyDescriptor.Builder()
|
public static final PropertyDescriptor RECURSE_SUBDIRS = new PropertyDescriptor.Builder()
|
||||||
.name("Recurse Subdirectories")
|
.name("Recurse Subdirectories")
|
||||||
.description("Indicates whether to list files from subdirectories of the HDFS directory")
|
.description("Indicates whether to list files from subdirectories of the HDFS directory")
|
||||||
|
@ -264,7 +255,6 @@ public class ListHDFS extends AbstractHadoopProcessor {
|
||||||
@Override
|
@Override
|
||||||
protected List<PropertyDescriptor> getSupportedPropertyDescriptors() {
|
protected List<PropertyDescriptor> getSupportedPropertyDescriptors() {
|
||||||
final List<PropertyDescriptor> props = new ArrayList<>(properties);
|
final List<PropertyDescriptor> props = new ArrayList<>(properties);
|
||||||
props.add(DISTRIBUTED_CACHE_SERVICE);
|
|
||||||
props.add(DIRECTORY);
|
props.add(DIRECTORY);
|
||||||
props.add(RECURSE_SUBDIRS);
|
props.add(RECURSE_SUBDIRS);
|
||||||
props.add(RECORD_WRITER);
|
props.add(RECORD_WRITER);
|
||||||
|
@ -284,13 +274,6 @@ public class ListHDFS extends AbstractHadoopProcessor {
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
protected Collection<ValidationResult> customValidate(ValidationContext context) {
|
protected Collection<ValidationResult> customValidate(ValidationContext context) {
|
||||||
if (context.getProperty(DISTRIBUTED_CACHE_SERVICE).isSet()) {
|
|
||||||
deprecationLogger.warn("{}[id={}] [{}] Property is not used",
|
|
||||||
getClass().getSimpleName(),
|
|
||||||
getIdentifier(),
|
|
||||||
DISTRIBUTED_CACHE_SERVICE.getDisplayName()
|
|
||||||
);
|
|
||||||
}
|
|
||||||
|
|
||||||
final List<ValidationResult> problems = new ArrayList<>(super.customValidate(context));
|
final List<ValidationResult> problems = new ArrayList<>(super.customValidate(context));
|
||||||
|
|
||||||
|
|
|
@ -25,10 +25,6 @@ import org.apache.hadoop.fs.Path;
|
||||||
import org.apache.hadoop.fs.permission.FsPermission;
|
import org.apache.hadoop.fs.permission.FsPermission;
|
||||||
import org.apache.hadoop.util.Progressable;
|
import org.apache.hadoop.util.Progressable;
|
||||||
import org.apache.nifi.components.state.Scope;
|
import org.apache.nifi.components.state.Scope;
|
||||||
import org.apache.nifi.controller.AbstractControllerService;
|
|
||||||
import org.apache.nifi.distributed.cache.client.Deserializer;
|
|
||||||
import org.apache.nifi.distributed.cache.client.DistributedMapCacheClient;
|
|
||||||
import org.apache.nifi.distributed.cache.client.Serializer;
|
|
||||||
import org.apache.nifi.hadoop.KerberosProperties;
|
import org.apache.nifi.hadoop.KerberosProperties;
|
||||||
import org.apache.nifi.reporting.InitializationException;
|
import org.apache.nifi.reporting.InitializationException;
|
||||||
import org.apache.nifi.util.MockComponentLog;
|
import org.apache.nifi.util.MockComponentLog;
|
||||||
|
@ -44,7 +40,6 @@ import java.io.File;
|
||||||
import java.io.FileNotFoundException;
|
import java.io.FileNotFoundException;
|
||||||
import java.io.IOException;
|
import java.io.IOException;
|
||||||
import java.net.URI;
|
import java.net.URI;
|
||||||
import java.util.ArrayList;
|
|
||||||
import java.util.Date;
|
import java.util.Date;
|
||||||
import java.util.HashMap;
|
import java.util.HashMap;
|
||||||
import java.util.HashSet;
|
import java.util.HashSet;
|
||||||
|
@ -52,11 +47,7 @@ import java.util.List;
|
||||||
import java.util.Map;
|
import java.util.Map;
|
||||||
import java.util.Set;
|
import java.util.Set;
|
||||||
import java.util.UUID;
|
import java.util.UUID;
|
||||||
import java.util.concurrent.ConcurrentHashMap;
|
|
||||||
import java.util.concurrent.ConcurrentMap;
|
|
||||||
import java.util.concurrent.TimeUnit;
|
import java.util.concurrent.TimeUnit;
|
||||||
import java.util.regex.Matcher;
|
|
||||||
import java.util.regex.Pattern;
|
|
||||||
import java.util.stream.Stream;
|
import java.util.stream.Stream;
|
||||||
|
|
||||||
import static org.apache.nifi.processors.hadoop.ListHDFS.FILTER_DIRECTORIES_AND_FILES_VALUE;
|
import static org.apache.nifi.processors.hadoop.ListHDFS.FILTER_DIRECTORIES_AND_FILES_VALUE;
|
||||||
|
@ -844,77 +835,4 @@ public class TestListHDFS {
|
||||||
}
|
}
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|
||||||
private class MockCacheClient extends AbstractControllerService implements DistributedMapCacheClient {
|
|
||||||
private final ConcurrentMap<Object, Object> values = new ConcurrentHashMap<>();
|
|
||||||
private boolean failOnCalls = false;
|
|
||||||
|
|
||||||
private void verifyNotFail() throws IOException {
|
|
||||||
if ( failOnCalls ) {
|
|
||||||
throw new IOException("Could not call to remote service because Unit Test marked service unavailable");
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
@Override
|
|
||||||
public <K, V> boolean putIfAbsent(final K key, final V value, final Serializer<K> keySerializer, final Serializer<V> valueSerializer) throws IOException {
|
|
||||||
verifyNotFail();
|
|
||||||
final Object retValue = values.putIfAbsent(key, value);
|
|
||||||
return (retValue == null);
|
|
||||||
}
|
|
||||||
|
|
||||||
@Override
|
|
||||||
@SuppressWarnings("unchecked")
|
|
||||||
public <K, V> V getAndPutIfAbsent(final K key, final V value, final Serializer<K> keySerializer, final Serializer<V> valueSerializer,
|
|
||||||
final Deserializer<V> valueDeserializer) throws IOException {
|
|
||||||
verifyNotFail();
|
|
||||||
return (V) values.putIfAbsent(key, value);
|
|
||||||
}
|
|
||||||
|
|
||||||
@Override
|
|
||||||
public <K> boolean containsKey(final K key, final Serializer<K> keySerializer) throws IOException {
|
|
||||||
verifyNotFail();
|
|
||||||
return values.containsKey(key);
|
|
||||||
}
|
|
||||||
|
|
||||||
@Override
|
|
||||||
public <K, V> void put(final K key, final V value, final Serializer<K> keySerializer, final Serializer<V> valueSerializer) throws IOException {
|
|
||||||
verifyNotFail();
|
|
||||||
values.put(key, value);
|
|
||||||
}
|
|
||||||
|
|
||||||
@Override
|
|
||||||
@SuppressWarnings("unchecked")
|
|
||||||
public <K, V> V get(final K key, final Serializer<K> keySerializer, final Deserializer<V> valueDeserializer) throws IOException {
|
|
||||||
verifyNotFail();
|
|
||||||
return (V) values.get(key);
|
|
||||||
}
|
|
||||||
|
|
||||||
@Override
|
|
||||||
public void close() throws IOException {
|
|
||||||
}
|
|
||||||
|
|
||||||
@Override
|
|
||||||
public <K> boolean remove(final K key, final Serializer<K> serializer) throws IOException {
|
|
||||||
verifyNotFail();
|
|
||||||
values.remove(key);
|
|
||||||
return true;
|
|
||||||
}
|
|
||||||
|
|
||||||
@Override
|
|
||||||
public long removeByPattern(String regex) throws IOException {
|
|
||||||
verifyNotFail();
|
|
||||||
final List<Object> removedRecords = new ArrayList<>();
|
|
||||||
Pattern p = Pattern.compile(regex);
|
|
||||||
for (Object key : values.keySet()) {
|
|
||||||
// Key must be backed by something that array() returns a byte[] that can be converted into a String via the default charset
|
|
||||||
Matcher m = p.matcher(key.toString());
|
|
||||||
if (m.matches()) {
|
|
||||||
removedRecords.add(values.get(key));
|
|
||||||
}
|
|
||||||
}
|
|
||||||
final long numRemoved = removedRecords.size();
|
|
||||||
removedRecords.forEach(values::remove);
|
|
||||||
return numRemoved;
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
|
|
Loading…
Reference in New Issue