Skip to content
This repository was archived by the owner on Dec 4, 2024. It is now read-only.

Commit bd707ac

Browse files
authored
[DCOS-38138] Update Spark CLI for shell-escape fix (#388)
This commit makes it possible to pass multi-argument strings configuration strings in the spark-cli.
1 parent 90c16fe commit bd707ac

File tree

16 files changed

+837
-128
lines changed

16 files changed

+837
-128
lines changed

cli/dcos-spark/main.go

Lines changed: 7 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -209,22 +209,22 @@ func (cmd *SparkCommand) runQuotaCreate(a *kingpin.Application, e *kingpin.Parse
209209
requestValues := make([]quotaCreateGuarantee, 0)
210210
if cmd.quotaCpus != 0 {
211211
requestValues = append(requestValues, quotaCreateGuarantee{
212-
Name: "cpus",
213-
Type: "SCALAR",
212+
Name: "cpus",
213+
Type: "SCALAR",
214214
Scalar: quotaScalar{Value: cmd.quotaCpus},
215215
})
216216
}
217217
if cmd.quotaMem != 0 {
218218
requestValues = append(requestValues, quotaCreateGuarantee{
219-
Name: "mem",
220-
Type: "SCALAR",
219+
Name: "mem",
220+
Type: "SCALAR",
221221
Scalar: quotaScalar{Value: cmd.quotaMem},
222222
})
223223
}
224224
if cmd.quotaGpus != 0 {
225225
requestValues = append(requestValues, quotaCreateGuarantee{
226-
Name: "gpus",
227-
Type: "SCALAR",
226+
Name: "gpus",
227+
Type: "SCALAR",
228228
Scalar: quotaScalar{Value: float64(cmd.quotaGpus)},
229229
})
230230
}
@@ -311,7 +311,7 @@ func handleCommands(app *kingpin.Application) {
311311

312312
run := app.Command("run", "Submit a job to the Spark Mesos Dispatcher").Action(cmd.runSubmit)
313313
run.Flag("submit-args", fmt.Sprintf("Arguments matching what would be sent to 'spark-submit': %s",
314-
sparkSubmitHelp())).Required().PlaceHolder("ARGS").StringVar(&cmd.submitArgs)
314+
sparkSubmitHelp())).Required().PlaceHolder("\"ARGS\"").StringVar(&cmd.submitArgs)
315315
// TODO this should be moved to submit args
316316
run.Flag("docker-image", "Docker image to run the job within").
317317
Default("").

cli/dcos-spark/security.go

Lines changed: 9 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -1,31 +1,30 @@
11
package main
22

33
import (
4-
"fmt"
5-
"errors"
64
"crypto/rand"
75
"encoding/base64"
6+
"errors"
7+
"fmt"
88
"github.com/mesosphere/dcos-commons/cli/client"
99
"strings"
1010
)
1111

12-
1312
const KEYLENGTH = 128
1413

1514
var TASK_TYPES = []string{"driver", "executor"}
1615

1716
var SECRET_REFERENCE_PROPERTIES = map[string]string{
18-
"driver": "spark.mesos.driver.secret.names",
17+
"driver": "spark.mesos.driver.secret.names",
1918
"executor": "spark.mesos.executor.secret.names",
2019
}
2120

2221
var SECRET_FILENAME_PROPERTIES = map[string]string{
23-
"driver": "spark.mesos.driver.secret.filenames",
22+
"driver": "spark.mesos.driver.secret.filenames",
2423
"executor": "spark.mesos.executor.secret.filenames",
2524
}
2625

2726
var SECRET_ENVKEY_PROPERTIES = map[string]string{
28-
"driver": "spark.mesos.driver.secret.envkeys",
27+
"driver": "spark.mesos.driver.secret.envkeys",
2928
"executor": "spark.mesos.executor.secret.envkeys",
3029
}
3130

@@ -34,7 +33,6 @@ const SPARK_KDC_PORT_KEY = "SPARK_SECURITY_KERBEROS_KDC_PORT"
3433
const SPARK_KERBEROS_REALM_KEY = "SPARK_SECURITY_KERBEROS_REALM"
3534
const SPARK_KERBEROS_KRB5_BLOB = "SPARK_MESOS_KRB5_CONF_BASE64"
3635

37-
3836
// utility function used by SASL and Kerberos for user-defined secrets that may be base64 encoded blobs
3937
// basically removes the prefix while ignoring the secret directory structure
4038
func prepareBase64Secret(secretPath string) string {
@@ -44,7 +42,7 @@ func prepareBase64Secret(secretPath string) string {
4442
}
4543

4644
absoluteSecretPath := strings.Split(secretPath, "/")
47-
filename := absoluteSecretPath[len(absoluteSecretPath) - 1]
45+
filename := absoluteSecretPath[len(absoluteSecretPath)-1]
4846
// secrets with __dcos_base64__ will be decoded by Mesos, but remove the prefix here
4947
if strings.HasPrefix(filename, "__dcos_base64__") {
5048
return strings.TrimPrefix(filename, "__dcos_base64__")
@@ -92,7 +90,7 @@ func setupTLSArgs(args *sparkArgs) {
9290
filenames := []string{keyStoreFileName}
9391
envkeys := []string{"DCOS_SPARK_KEYSTORE"}
9492

95-
if args.truststoreSecretPath != "" { // optionally add the truststore configs also
93+
if args.truststoreSecretPath != "" { // optionally add the truststore configs also
9694
addPropertyAndWarn(args, "spark.ssl.trustStore", trustStoreFileName)
9795
setPropertyToDefaultIfNotSet(args, "spark.ssl.trustStorePassword", args.truststorePassword)
9896
paths = append(paths, args.truststoreSecretPath)
@@ -258,8 +256,8 @@ func forwardEnvironmentVariablesFromMarathonConfig(args *sparkArgs, marathonJson
258256

259257
if kdcPropCount > 0 && kdcPropCount != 3 {
260258
client.PrintMessage(
261-
"WARNING: Missing some of the 3 dispatcher environment variables (%s, %s, %s) " +
262-
"required for templating krb5.conf",
259+
"WARNING: Missing some of the 3 dispatcher environment variables (%s, %s, %s) "+
260+
"required for templating krb5.conf",
263261
SPARK_KDC_HOSTNAME_KEY, SPARK_KDC_PORT_KEY, SPARK_KERBEROS_REALM_KEY)
264262
}
265263

cli/dcos-spark/submit_builder.go

Lines changed: 82 additions & 69 deletions
Original file line numberDiff line numberDiff line change
@@ -6,14 +6,16 @@ import (
66
"encoding/json"
77
"errors"
88
"fmt"
9-
"github.com/mesosphere/dcos-commons/cli/client"
10-
"github.com/mesosphere/dcos-commons/cli/config"
11-
"gopkg.in/alecthomas/kingpin.v3-unstable"
129
"log"
1310
"net/url"
1411
"os"
1512
"regexp"
1613
"strings"
14+
15+
"github.com/mattn/go-shellwords"
16+
"github.com/mesosphere/dcos-commons/cli/client"
17+
"github.com/mesosphere/dcos-commons/cli/config"
18+
"gopkg.in/alecthomas/kingpin.v3-unstable"
1719
)
1820

1921
var keyWhitespaceValPattern = regexp.MustCompile("(.+)\\s+(.+)")
@@ -146,8 +148,10 @@ Args:
146148
StringVar(&args.mainClass) // note: spark-submit can autodetect, but only for file://local.jar
147149
submit.Flag("properties-file", "Path to file containing whitespace-separated Spark property defaults.").
148150
PlaceHolder("PATH").ExistingFileVar(&args.propertiesFile)
149-
submit.Flag("conf", "Custom Spark configuration properties.").
150-
PlaceHolder("PROP=VALUE").StringMapVar(&args.properties)
151+
submit.Flag("conf", "Custom Spark configuration properties. "+
152+
"If submitting properties with multiple values, "+
153+
"wrap in single quotes e.g. --conf prop='val1 val2'").
154+
PlaceHolder("prop=value").StringMapVar(&args.properties)
151155
submit.Flag("kerberos-principal", "Principal to be used to login to KDC.").
152156
PlaceHolder("user@REALM").Default("").StringVar(&args.kerberosPrincipal)
153157
submit.Flag("keytab-secret-path", "Path to Keytab in secret store to be used in the Spark drivers").
@@ -280,75 +284,84 @@ func parseApplicationFile(args *sparkArgs) error {
280284
return nil
281285
}
282286

283-
func cleanUpSubmitArgs(argsStr string, boolVals []*sparkVal) ([]string, []string) {
284-
285-
// collapse two or more spaces to one.
286-
argsCompacted := collapseSpacesPattern.ReplaceAllString(argsStr, " ")
287+
// we use Kingpin to parse CLI commands and options
288+
// spark-submit by convention uses '--arg val' while kingpin only supports --arg=val
289+
// transformSubmitArgs turns the former into the latter
290+
func transformSubmitArgs(argsStr string, boolVals []*sparkVal) ([]string, []string) {
287291
// clean up any instances of shell-style escaped newlines: "arg1\\narg2" => "arg1 arg2"
288-
argsCleaned := strings.TrimSpace(backslashNewlinePattern.ReplaceAllLiteralString(argsCompacted, " "))
289-
// HACK: spark-submit uses '--arg val' by convention, while kingpin only supports '--arg=val'.
290-
// translate the former into the latter for kingpin to parse.
291-
args := strings.Split(argsCleaned, " ")
292-
argsEquals := make([]string, 0)
293-
appFlags := make([]string, 0)
294-
i := 0
295-
ARGLOOP:
296-
for i < len(args) {
297-
arg := args[i]
298-
if !strings.HasPrefix(arg, "-") {
299-
// looks like we've exited the flags entirely, and are now at the jar and/or args.
300-
// any arguments without a dash at the front should've been joined to preceding keys.
301-
// flush the rest and exit.
302-
for i < len(args) {
303-
arg = args[i]
304-
// if we have a --flag going to the application we need to take the arg (flag) and the value ONLY
305-
// if it's not of the format --flag=val which scopt allows
306-
if strings.HasPrefix(arg, "-") {
307-
appFlags = append(appFlags, arg)
308-
if strings.Contains(arg, "=") || (i+1) >= len(args) {
309-
i += 1
310-
} else {
311-
// if there's a value with this flag, add it
312-
if !strings.HasPrefix(args[i+1], "-") {
313-
appFlags = append(appFlags, args[i+1])
314-
i += 1
315-
}
316-
i += 1
317-
}
318-
} else {
319-
argsEquals = append(argsEquals, arg)
320-
i += 1
321-
}
292+
argsStr = strings.TrimSpace(backslashNewlinePattern.ReplaceAllLiteralString(argsStr, " "))
293+
// collapse two or more spaces to one
294+
argsStr = collapseSpacesPattern.ReplaceAllString(argsStr, " ")
295+
// parse argsStr into []string args maintaining shell escaped sequences
296+
args, err := shellwords.Parse(argsStr)
297+
if err != nil {
298+
log.Fatalf("Could not parse string args correctly. Error: %v", err)
299+
}
300+
sparkArgs, appArgs := make([]string, 0), make([]string, 0)
301+
LOOP:
302+
for i := 0; i < len(args); {
303+
current := strings.TrimSpace(args[i])
304+
switch {
305+
// The main assumption with --submit-args is that all spark-submit flags come before the spark jar URL
306+
// if current is a spark jar/app, we've processed all flags
307+
case isSparkApp(current):
308+
sparkArgs = append(sparkArgs, args[i])
309+
appArgs = append(appArgs, args[i+1:]...)
310+
break LOOP
311+
case strings.HasPrefix(current, "--"):
312+
if isBoolFlag(boolVals, current) {
313+
sparkArgs = append(sparkArgs, current)
314+
i++
315+
continue LOOP
322316
}
323-
break
324-
}
325-
// join this arg to the next arg if...:
326-
// 1. we're not at the last arg in the array
327-
// 2. we start with "--"
328-
// 3. we don't already contain "=" (already joined)
329-
// 4. we aren't a boolean value (no val to join)
330-
if i < len(args)-1 && strings.HasPrefix(arg, "--") && !strings.Contains(arg, "=") {
331-
// check for boolean:
332-
for _, boolVal := range boolVals {
333-
if boolVal.flagName == arg[2:] {
334-
argsEquals = append(argsEquals, arg)
335-
i += 1
336-
continue ARGLOOP
337-
}
317+
if strings.Contains(current, "=") {
318+
// already in the form arg=val, no merge required
319+
sparkArgs = append(sparkArgs, current)
320+
i++
321+
continue LOOP
338322
}
339-
// merge this --key against the following val to get --key=val
340-
argsEquals = append(argsEquals, arg+"="+args[i+1])
323+
// otherwise, merge current with next into form arg=val; eg --driver-memory=512m
324+
next := args[i+1]
325+
sparkArgs = append(sparkArgs, current+"="+next)
341326
i += 2
342-
} else {
343-
// already joined or at the end, pass through:
344-
argsEquals = append(argsEquals, arg)
345-
i += 1
327+
default:
328+
// if not a flag or jar, current is a continuation of the last arg and should not have been split
329+
// eg extraJavaOptions="-Dparam1 -Dparam2" was parsed as [extraJavaOptions, -Dparam1, -Dparam2]
330+
combined := sparkArgs[len(sparkArgs)-1] + " " + current
331+
sparkArgs = append(sparkArgs[:len(sparkArgs)-1], combined)
332+
i++
346333
}
347334
}
348-
client.PrintVerbose("Translated spark-submit arguments: '%s'", argsEquals)
349-
client.PrintVerbose("Translated application arguments: '%s'", appFlags)
335+
if config.Verbose {
336+
client.PrintVerbose("Translated spark-submit arguments: '%s'", strings.Join(sparkArgs, ", "))
337+
client.PrintVerbose("Translated application arguments: '%s'", strings.Join(appArgs, ", "))
338+
}
339+
return sparkArgs, appArgs
340+
}
350341

351-
return argsEquals, appFlags
342+
var acceptedSparkAppExtensions = []string{
343+
".jar",
344+
".py",
345+
".R",
346+
}
347+
348+
func isSparkApp(str string) bool {
349+
for _, ext := range acceptedSparkAppExtensions {
350+
if strings.HasSuffix(str, ext) {
351+
return true
352+
}
353+
}
354+
return false
355+
}
356+
357+
// check if string is a boolean flag (eg --supervise)
358+
func isBoolFlag(boolVals []*sparkVal, str string) bool {
359+
for _, boolVal := range boolVals {
360+
if boolVal.flagName == str[2:] {
361+
return true
362+
}
363+
}
364+
return false
352365
}
353366

354367
func getValsFromPropertiesFile(path string) map[string]string {
@@ -416,7 +429,7 @@ func buildSubmitJson(cmd *SparkCommand, marathonConfig map[string]interface{}) (
416429
// then map flags
417430
submit, args := sparkSubmitArgSetup() // setup
418431
// convert and get application flags, add them to the args passed to the spark app
419-
submitArgs, appFlags := cleanUpSubmitArgs(cmd.submitArgs, args.boolVals)
432+
submitArgs, appFlags := transformSubmitArgs(cmd.submitArgs, args.boolVals)
420433
args.appArgs = append(args.appArgs, appFlags...)
421434
_, err := submit.Parse(submitArgs)
422435

@@ -509,7 +522,7 @@ func buildSubmitJson(cmd *SparkCommand, marathonConfig map[string]interface{}) (
509522
} else {
510523
client.PrintMessage("Using image '%s' for the driver and the executors (from %s).",
511524
args.properties["spark.mesos.executor.docker.image"], imageSource)
512-
client.PrintMessage("To disable this image on executors, set "+
525+
client.PrintMessage("To disable this image on executors, set " +
513526
"spark.mesos.executor.docker.forcePullImage=false")
514527
args.properties["spark.mesos.executor.docker.forcePullImage"] = "true"
515528
}

0 commit comments

Comments
 (0)