Skip to content

Commit 70bfab9

Browse files
committed
HADOOP-19363. Fixing audit tests
Most of this PR is trying to improve debugging of the auditor invocation plane on the assumption that those flags being passed down from the factory were causing problems. None of those changes did any good, though they did marginally improve debugging. The actual problem was ordering of component startup during FS init: the Auditor must be live before the AWS client is initialized. Moved back to the right place and improved documentation. Also: added a test to verify that setting flags would disable the span checks, which is what we now require. Change-Id: I108116f0775b71b1cf1c9a2bd5c95727f24f37bb
1 parent e5371d2 commit 70bfab9

File tree

9 files changed

+126
-40
lines changed

9 files changed

+126
-40
lines changed

hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AFileSystem.java

Lines changed: 15 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -230,6 +230,7 @@
230230
import static org.apache.hadoop.fs.s3a.Listing.toLocatedFileStatusIterator;
231231
import static org.apache.hadoop.fs.s3a.S3AUtils.*;
232232
import static org.apache.hadoop.fs.s3a.Statistic.*;
233+
import static org.apache.hadoop.fs.s3a.audit.AuditIntegration.isRejectOutOfSpan;
233234
import static org.apache.hadoop.fs.s3a.audit.S3AAuditConstants.INITIALIZE_SPAN;
234235
import static org.apache.hadoop.fs.s3a.auth.CredentialProviderListFactory.createAWSCredentialProviderList;
235236
import static org.apache.hadoop.fs.s3a.auth.RolePolicies.STATEMENT_ALLOW_KMS_RW;
@@ -256,6 +257,7 @@
256257
import static org.apache.hadoop.fs.s3a.impl.NetworkBinding.logDnsLookup;
257258
import static org.apache.hadoop.fs.s3a.impl.S3ExpressStorage.STORE_CAPABILITY_S3_EXPRESS_STORAGE;
258259
import static org.apache.hadoop.fs.s3a.impl.S3ExpressStorage.isS3ExpressStore;
260+
import static org.apache.hadoop.fs.s3a.impl.streams.StreamFactoryRequirements.Requirements.ExpectUnauditedGetRequests;
259261
import static org.apache.hadoop.fs.s3a.s3guard.S3Guard.checkNoS3Guard;
260262
import static org.apache.hadoop.fs.statistics.IOStatisticsLogging.logIOStatisticsAtLevel;
261263
import static org.apache.hadoop.fs.statistics.StoreStatisticNames.OBJECT_CONTINUE_LIST_REQUEST;
@@ -301,6 +303,9 @@ public class S3AFileSystem extends FileSystem implements StreamCapabilities,
301303

302304
private String username;
303305

306+
/**
307+
* Store back end.
308+
*/
304309
private S3AStore store;
305310

306311
/**
@@ -683,13 +688,22 @@ public void initialize(URI name, Configuration originalConf)
683688
signerManager = new SignerManager(bucket, this, conf, owner);
684689
signerManager.initCustomSigners();
685690

691+
// start auditing
692+
// extra configuration will be passed down later.
693+
initializeAuditService();
694+
695+
// create the requestFactory.
696+
// requires the audit manager to be initialized.
697+
requestFactory = createRequestFactory();
698+
686699
// create an initial span for all other operations.
687700
span = createSpan(INITIALIZE_SPAN, bucket, null);
688701

689702
// creates the AWS client, including overriding auth chain if
690703
// the FS came with a DT
691704
// this may do some patching of the configuration (e.g. setting
692705
// the encryption algorithms)
706+
// requires the audit manager to be initialized.
693707
ClientManager clientManager = createClientManager(name, delegationTokensEnabled);
694708

695709
inputPolicy = S3AInputPolicy.getPolicy(
@@ -770,14 +784,6 @@ public void initialize(URI name, Configuration originalConf)
770784

771785
int rateLimitCapacity = intOption(conf, S3A_IO_RATE_LIMIT, DEFAULT_S3A_IO_RATE_LIMIT, 0);
772786

773-
// start auditing.
774-
// extra configuration will be passed down later.
775-
initializeAuditService();
776-
777-
// create the requestFactory.
778-
// requires the audit manager to be initialized.
779-
requestFactory = createRequestFactory();
780-
781787
// now create and initialize the store
782788
store = createS3AStore(clientManager, rateLimitCapacity);
783789
// the s3 client is created through the store, rather than
@@ -792,8 +798,7 @@ public void initialize(URI name, Configuration originalConf)
792798
// If the input stream can issue get requests outside spans,
793799
// the auditor is forced to disable rejection of unaudited requests.
794800
final EnumSet<AuditorFlags> flags = EnumSet.noneOf(AuditorFlags.class);
795-
if (factoryRequirements.requires(
796-
StreamFactoryRequirements.Requirements.ExpectUnauditedGetRequests)) {
801+
if (factoryRequirements.requires(ExpectUnauditedGetRequests)) {
797802
flags.add(AuditorFlags.PermitOutOfBandOperations);
798803
}
799804
getAuditManager().setAuditFlags(flags);

hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/audit/AuditIntegration.java

Lines changed: 12 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -38,6 +38,7 @@
3838
import static java.util.Objects.requireNonNull;
3939
import static org.apache.hadoop.fs.s3a.audit.S3AAuditConstants.AUDIT_ENABLED;
4040
import static org.apache.hadoop.fs.s3a.audit.S3AAuditConstants.AUDIT_ENABLED_DEFAULT;
41+
import static org.apache.hadoop.fs.s3a.audit.S3AAuditConstants.REJECT_OUT_OF_SPAN_OPERATIONS;
4142
import static org.apache.hadoop.fs.s3a.audit.impl.S3AInternalAuditConstants.AUDIT_SPAN_EXECUTION_ATTRIBUTE;
4243

4344
/**
@@ -68,7 +69,7 @@ public static AuditManagerS3A createAndStartAuditManager(
6869
auditManager = new ActiveAuditManagerS3A(
6970
requireNonNull(iostatistics));
7071
} else {
71-
LOG.debug("auditing is disabled");
72+
LOG.debug(" is disabled");
7273
auditManager = stubAuditManager();
7374
}
7475
auditManager.init(conf);
@@ -186,4 +187,14 @@ public static boolean containsAuditException(Exception exception) {
186187
|| exception.getCause() instanceof AuditFailureException;
187188
}
188189

190+
/**
191+
* Check if the configuration is set to reject operations that are
192+
* performed outside of an audit span.
193+
*
194+
* @param conf the configuration to check
195+
* @return true if operations outside of an audit span should be rejected, false otherwise
196+
*/
197+
public static boolean isRejectOutOfSpan(final Configuration conf) {
198+
return conf.getBoolean(REJECT_OUT_OF_SPAN_OPERATIONS, false);
199+
}
189200
}

hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/audit/impl/AbstractOperationAuditor.java

Lines changed: 66 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,11 @@
2323
import java.util.concurrent.atomic.AtomicBoolean;
2424
import java.util.concurrent.atomic.AtomicLong;
2525

26+
import org.slf4j.Logger;
27+
import org.slf4j.LoggerFactory;
28+
29+
import org.apache.hadoop.conf.Configuration;
30+
import org.apache.hadoop.fs.s3a.audit.AuditIntegration;
2631
import org.apache.hadoop.fs.s3a.audit.AuditorFlags;
2732
import org.apache.hadoop.fs.s3a.audit.OperationAuditor;
2833
import org.apache.hadoop.fs.s3a.audit.OperationAuditorOptions;
@@ -41,6 +46,9 @@
4146
public abstract class AbstractOperationAuditor extends AbstractService
4247
implements OperationAuditor {
4348

49+
private static final Logger LOG =
50+
LoggerFactory.getLogger(AbstractOperationAuditor.class);
51+
4452
/**
4553
* Base of IDs is a UUID.
4654
*/
@@ -79,6 +87,11 @@ public abstract class AbstractOperationAuditor extends AbstractService
7987
*/
8088
private final String auditorID = auditorUUID.toString();
8189

90+
/**
91+
* Audit flags which can be passed down to subclasses.
92+
*/
93+
private EnumSet<AuditorFlags> auditorFlags;
94+
8295
/**
8396
* Construct.
8497
* @param name name
@@ -99,6 +112,15 @@ public void init(final OperationAuditorOptions opts) {
99112
init(opts.getConfiguration());
100113
}
101114

115+
@Override
116+
protected void serviceInit(final Configuration conf) throws Exception {
117+
super.serviceInit(conf);
118+
setRejectOutOfSpan(AuditIntegration.isRejectOutOfSpan(conf));
119+
LOG.debug("{}: Out of span operations will be {}",
120+
getName(),
121+
isRejectOutOfSpan() ? "rejected" : "ignored");
122+
}
123+
102124
@Override
103125
public String getAuditorId() {
104126
return auditorID;
@@ -129,8 +151,51 @@ protected final String createSpanID() {
129151
auditorID, SPAN_ID_COUNTER.incrementAndGet());
130152
}
131153

154+
/**
155+
* Should out of scope ops be rejected?
156+
*/
157+
protected boolean isRejectOutOfSpan() {
158+
return rejectOutOfSpan.get();
159+
}
160+
161+
/**
162+
* Enable/disable out of span rejection.
163+
* @param rejectOutOfSpan new value.
164+
*/
165+
protected void setRejectOutOfSpan(boolean rejectOutOfSpan) {
166+
this.rejectOutOfSpan.set(rejectOutOfSpan);
167+
}
168+
169+
/**
170+
* Update Auditor flags.
171+
* Calls {@link #auditorFlagsChanged(EnumSet)} after the update.
172+
* @param flags audit flags.
173+
*/
132174
@Override
133175
public void setAuditFlags(final EnumSet<AuditorFlags> flags) {
134-
/* no-op */
176+
auditorFlags = flags;
177+
auditorFlagsChanged(flags);
178+
}
179+
180+
/**
181+
* Get the current set of auditor flags.
182+
*
183+
* @return the current set of auditor flags.
184+
*/
185+
public EnumSet<AuditorFlags> getAuditorFlags() {
186+
return auditorFlags;
135187
}
188+
189+
/**
190+
* Notification that the auditor flags have been updated.
191+
* @param flags audit flags.
192+
*/
193+
protected void auditorFlagsChanged(EnumSet<AuditorFlags> flags) {
194+
// if out of band operations are allowed, configuration settings are overridden
195+
if (flags.contains(AuditorFlags.PermitOutOfBandOperations)) {
196+
LOG.debug("Out of span operations are required by the stream factory");
197+
setRejectOutOfSpan(false);
198+
}
199+
}
200+
136201
}

hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/audit/impl/LoggingAuditor.java

Lines changed: 4 additions & 21 deletions
Original file line numberDiff line numberDiff line change
@@ -21,7 +21,6 @@
2121
import javax.annotation.Nullable;
2222
import java.io.IOException;
2323
import java.util.Collection;
24-
import java.util.EnumSet;
2524
import java.util.HashMap;
2625
import java.util.Map;
2726
import java.util.Optional;
@@ -47,7 +46,6 @@
4746
import org.apache.hadoop.fs.s3a.audit.AuditFailureException;
4847
import org.apache.hadoop.fs.s3a.audit.AuditOperationRejectedException;
4948
import org.apache.hadoop.fs.s3a.audit.AuditSpanS3A;
50-
import org.apache.hadoop.fs.s3a.audit.AuditorFlags;
5149
import org.apache.hadoop.fs.store.LogExactlyOnce;
5250
import org.apache.hadoop.fs.store.audit.HttpReferrerAuditHeader;
5351
import org.apache.hadoop.security.UserGroupInformation;
@@ -67,7 +65,6 @@
6765
import static org.apache.hadoop.fs.s3a.audit.S3AAuditConstants.REFERRER_HEADER_ENABLED;
6866
import static org.apache.hadoop.fs.s3a.audit.S3AAuditConstants.REFERRER_HEADER_ENABLED_DEFAULT;
6967
import static org.apache.hadoop.fs.s3a.audit.S3AAuditConstants.REFERRER_HEADER_FILTER;
70-
import static org.apache.hadoop.fs.s3a.audit.S3AAuditConstants.REJECT_OUT_OF_SPAN_OPERATIONS;
7168
import static org.apache.hadoop.fs.s3a.audit.S3AAuditConstants.UNAUDITED_OPERATION;
7269
import static org.apache.hadoop.fs.s3a.commit.CommitUtils.extractJobID;
7370
import static org.apache.hadoop.fs.s3a.impl.HeaderProcessing.HEADER_REFERRER;
@@ -99,11 +96,6 @@ public class LoggingAuditor
9996
*/
10097
private AuditSpanS3A warningSpan;
10198

102-
/**
103-
* Should out of scope ops be rejected?
104-
*/
105-
private boolean rejectOutOfSpan;
106-
10799
/**
108100
* Map of attributes which will be added to all operations.
109101
*/
@@ -173,8 +165,6 @@ public LoggingAuditor() {
173165
@Override
174166
protected void serviceInit(final Configuration conf) throws Exception {
175167
super.serviceInit(conf);
176-
rejectOutOfSpan = conf.getBoolean(
177-
REJECT_OUT_OF_SPAN_OPERATIONS, false);
178168
// attach the job ID if there is one in the configuration used
179169
// to create this file.
180170
String jobID = extractJobID(conf);
@@ -189,6 +179,7 @@ protected void serviceInit(final Configuration conf) throws Exception {
189179
currentContext, createSpanID(), null, null);
190180
isMultipartUploadEnabled = conf.getBoolean(MULTIPART_UPLOADS_ENABLED,
191181
DEFAULT_MULTIPART_UPLOAD_ENABLED);
182+
LOG.debug("Initialized {}", this);
192183
}
193184

194185
@Override
@@ -197,7 +188,7 @@ public String toString() {
197188
"LoggingAuditor{");
198189
sb.append("ID='").append(getAuditorId()).append('\'');
199190
sb.append(", headerEnabled=").append(headerEnabled);
200-
sb.append(", rejectOutOfSpan=").append(rejectOutOfSpan);
191+
sb.append(", rejectOutOfSpan=").append(isRejectOutOfSpan());
201192
sb.append(", isMultipartUploadEnabled=").append(isMultipartUploadEnabled);
202193
sb.append('}');
203194
return sb.toString();
@@ -217,15 +208,6 @@ public AuditSpanS3A createSpan(final String operation,
217208
return span;
218209
}
219210

220-
@Override
221-
public void setAuditFlags(final EnumSet<AuditorFlags> flags) {
222-
// if out of band operations are allowed, configuration settings are overridden
223-
if (flags.contains(AuditorFlags.PermitOutOfBandOperations)) {
224-
LOG.debug("Out of span operations are required by the stream factory");
225-
rejectOutOfSpan = false;
226-
}
227-
}
228-
229211
/**
230212
* Get/Prepare the active context for a span.
231213
* @return the common audit context.
@@ -275,6 +257,7 @@ private void setLastHeader(final String lastHeader) {
275257
HttpReferrerAuditHeader getReferrer(AuditSpanS3A span) {
276258
return ((LoggingAuditSpan) span).getReferrer();
277259
}
260+
278261
/**
279262
* Span which logs at debug and sets the HTTP referrer on
280263
* invocations.
@@ -556,7 +539,7 @@ public void beforeExecution(Context.BeforeExecution context,
556539
} else {
557540
final RuntimeException ex = new AuditFailureException(unaudited);
558541
LOG.debug(unaudited, ex);
559-
if (rejectOutOfSpan) {
542+
if (isRejectOutOfSpan()) {
560543
throw ex;
561544
}
562545
}

hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/audit/impl/NoopAuditManagerS3A.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -79,7 +79,7 @@ public NoopAuditManagerS3A() {
7979
@Override
8080
protected void serviceInit(final Configuration conf) throws Exception {
8181
super.serviceInit(conf);
82-
NoopAuditor audit = new NoopAuditor(this);
82+
NoopAuditor audit = new NoopAuditor("NoopAuditor", this);
8383
final OperationAuditorOptions options =
8484
OperationAuditorOptions.builder()
8585
.withConfiguration(conf)

hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/audit/impl/NoopAuditor.java

Lines changed: 11 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -47,23 +47,29 @@ public class NoopAuditor extends AbstractOperationAuditor {
4747
* Constructor.
4848
* This will be used when the auditor is created through
4949
* configuration and classloading.
50+
* @param name auditor name
5051
*/
51-
public NoopAuditor() {
52-
this(null);
52+
public NoopAuditor(String name) {
53+
this(name, null);
5354
}
5455

5556
/**
5657
* Constructor when explicitly created within
5758
* the {@link NoopAuditManagerS3A}.
59+
* @param name auditor name
5860
* @param activationCallbacks Activation callbacks.
5961
*/
6062
public NoopAuditor(
61-
NoopSpan.SpanActivationCallbacks activationCallbacks) {
62-
super("NoopAuditor");
63+
String name, NoopSpan.SpanActivationCallbacks activationCallbacks) {
64+
super(name);
6365
this.unbondedSpan = createSpan("unbonded", null, null);
6466
this.activationCallbacks = activationCallbacks;
6567
}
6668

69+
public NoopAuditor() {
70+
this("NoopAuditor");
71+
}
72+
6773
@Override
6874
public AuditSpanS3A createSpan(
6975
final String operation,
@@ -86,7 +92,7 @@ public AuditSpanS3A getUnbondedSpan() {
8692
*/
8793
public static NoopAuditor createAndStartNoopAuditor(Configuration conf,
8894
NoopSpan.SpanActivationCallbacks activationCallbacks) {
89-
NoopAuditor noop = new NoopAuditor(activationCallbacks);
95+
NoopAuditor noop = new NoopAuditor("NoopAuditor", activationCallbacks);
9096
final OperationAuditorOptions options =
9197
OperationAuditorOptions.builder()
9298
.withConfiguration(conf)

hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/audit/AccessCheckingAuditor.java

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,9 @@
2020

2121
import java.io.IOException;
2222

23+
import org.slf4j.Logger;
24+
import org.slf4j.LoggerFactory;
25+
2326
import org.apache.hadoop.fs.Path;
2427
import org.apache.hadoop.fs.permission.FsAction;
2528
import org.apache.hadoop.fs.s3a.S3AFileStatus;
@@ -33,10 +36,14 @@ public class AccessCheckingAuditor extends NoopAuditor {
3336
public static final String CLASS =
3437
"org.apache.hadoop.fs.s3a.audit.AccessCheckingAuditor";
3538

39+
private static final Logger LOG =
40+
LoggerFactory.getLogger(AccessCheckingAuditor.class);
41+
3642
/** Flag to enable/disable access. */
3743
private boolean accessAllowed = true;
3844

3945
public AccessCheckingAuditor() {
46+
super("AccessCheckingAuditor");
4047
}
4148

4249
public void setAccessAllowed(final boolean accessAllowed) {
@@ -48,6 +55,9 @@ public boolean checkAccess(final Path path,
4855
final S3AFileStatus status,
4956
final FsAction mode)
5057
throws IOException {
58+
59+
LOG.debug("Check access to {}; allowed={}",
60+
path, accessAllowed);
5161
return accessAllowed;
5262
}
5363
}

0 commit comments

Comments
 (0)