mirror of https://github.com/apache/nifi.git
NIFI-4311 Allowing umask to get set properly before initializing the FileSystem
Signed-off-by: Pierre Villard <pierre.villard.fr@gmail.com> This closes #2106.
This commit is contained in:
parent
05700a2008
commit
cf57639396
|
@ -252,6 +252,9 @@ public abstract class AbstractHadoopProcessor extends AbstractProcessor {
|
||||||
|
|
||||||
getConfigurationFromResources(config, configResources);
|
getConfigurationFromResources(config, configResources);
|
||||||
|
|
||||||
|
// give sub-classes a chance to process configuration
|
||||||
|
preProcessConfiguration(config, context);
|
||||||
|
|
||||||
// first check for timeout on HDFS connection, because FileSystem has a hard coded 15 minute timeout
|
// first check for timeout on HDFS connection, because FileSystem has a hard coded 15 minute timeout
|
||||||
checkHdfsUriForTimeout(config);
|
checkHdfsUriForTimeout(config);
|
||||||
|
|
||||||
|
@ -287,6 +290,17 @@ public abstract class AbstractHadoopProcessor extends AbstractProcessor {
|
||||||
return new HdfsResources(config, fs, ugi);
|
return new HdfsResources(config, fs, ugi);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* This method will be called after the Configuration has been created, but before the FileSystem is created,
|
||||||
|
* allowing sub-classes to take further action on the Configuration before creating the FileSystem.
|
||||||
|
*
|
||||||
|
* @param config the Configuration that will be used to create the FileSystem
|
||||||
|
* @param context the context that can be used to retrieve additional values
|
||||||
|
*/
|
||||||
|
protected void preProcessConfiguration(final Configuration config, final ProcessContext context) {
|
||||||
|
|
||||||
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* This exists in order to allow unit tests to override it so that they don't take several minutes waiting for UDP packets to be received
|
* This exists in order to allow unit tests to override it so that they don't take several minutes waiting for UDP packets to be received
|
||||||
*
|
*
|
||||||
|
|
|
@ -210,14 +210,8 @@ public abstract class AbstractPutHDFSRecord extends AbstractHadoopProcessor {
|
||||||
return putHdfsRecordProperties;
|
return putHdfsRecordProperties;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
@OnScheduled
|
protected void preProcessConfiguration(Configuration config, ProcessContext context) {
|
||||||
public final void onScheduled(final ProcessContext context) throws IOException {
|
|
||||||
super.abstractOnScheduled(context);
|
|
||||||
|
|
||||||
this.remoteOwner = context.getProperty(REMOTE_OWNER).getValue();
|
|
||||||
this.remoteGroup = context.getProperty(REMOTE_GROUP).getValue();
|
|
||||||
|
|
||||||
// Set umask once, to avoid thread safety issues doing it in onTrigger
|
// Set umask once, to avoid thread safety issues doing it in onTrigger
|
||||||
final PropertyValue umaskProp = context.getProperty(UMASK);
|
final PropertyValue umaskProp = context.getProperty(UMASK);
|
||||||
final short dfsUmask;
|
final short dfsUmask;
|
||||||
|
@ -226,8 +220,16 @@ public abstract class AbstractPutHDFSRecord extends AbstractHadoopProcessor {
|
||||||
} else {
|
} else {
|
||||||
dfsUmask = FsPermission.DEFAULT_UMASK;
|
dfsUmask = FsPermission.DEFAULT_UMASK;
|
||||||
}
|
}
|
||||||
final Configuration conf = getConfiguration();
|
|
||||||
FsPermission.setUMask(conf, new FsPermission(dfsUmask));
|
FsPermission.setUMask(config, new FsPermission(dfsUmask));
|
||||||
|
}
|
||||||
|
|
||||||
|
@OnScheduled
|
||||||
|
public final void onScheduled(final ProcessContext context) throws IOException {
|
||||||
|
super.abstractOnScheduled(context);
|
||||||
|
|
||||||
|
this.remoteOwner = context.getProperty(REMOTE_OWNER).getValue();
|
||||||
|
this.remoteGroup = context.getProperty(REMOTE_GROUP).getValue();
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
|
|
|
@ -32,7 +32,6 @@ import org.apache.nifi.annotation.behavior.WritesAttributes;
|
||||||
import org.apache.nifi.annotation.documentation.CapabilityDescription;
|
import org.apache.nifi.annotation.documentation.CapabilityDescription;
|
||||||
import org.apache.nifi.annotation.documentation.SeeAlso;
|
import org.apache.nifi.annotation.documentation.SeeAlso;
|
||||||
import org.apache.nifi.annotation.documentation.Tags;
|
import org.apache.nifi.annotation.documentation.Tags;
|
||||||
import org.apache.nifi.annotation.lifecycle.OnScheduled;
|
|
||||||
import org.apache.nifi.components.AllowableValue;
|
import org.apache.nifi.components.AllowableValue;
|
||||||
import org.apache.nifi.components.PropertyDescriptor;
|
import org.apache.nifi.components.PropertyDescriptor;
|
||||||
import org.apache.nifi.components.PropertyValue;
|
import org.apache.nifi.components.PropertyValue;
|
||||||
|
@ -188,10 +187,8 @@ public class PutHDFS extends AbstractHadoopProcessor {
|
||||||
return props;
|
return props;
|
||||||
}
|
}
|
||||||
|
|
||||||
@OnScheduled
|
@Override
|
||||||
public void onScheduled(ProcessContext context) throws Exception {
|
protected void preProcessConfiguration(final Configuration config, final ProcessContext context) {
|
||||||
super.abstractOnScheduled(context);
|
|
||||||
|
|
||||||
// Set umask once, to avoid thread safety issues doing it in onTrigger
|
// Set umask once, to avoid thread safety issues doing it in onTrigger
|
||||||
final PropertyValue umaskProp = context.getProperty(UMASK);
|
final PropertyValue umaskProp = context.getProperty(UMASK);
|
||||||
final short dfsUmask;
|
final short dfsUmask;
|
||||||
|
@ -200,8 +197,8 @@ public class PutHDFS extends AbstractHadoopProcessor {
|
||||||
} else {
|
} else {
|
||||||
dfsUmask = FsPermission.DEFAULT_UMASK;
|
dfsUmask = FsPermission.DEFAULT_UMASK;
|
||||||
}
|
}
|
||||||
final Configuration conf = getConfiguration();
|
|
||||||
FsPermission.setUMask(conf, new FsPermission(dfsUmask));
|
FsPermission.setUMask(config, new FsPermission(dfsUmask));
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
|
|
Loading…
Reference in New Issue