|
1 | 1 | package exporter |
2 | 2 |
|
3 | 3 | import ( |
| 4 | + "encoding/json" |
| 5 | + "fmt" |
4 | 6 | "log" |
5 | 7 | "strings" |
6 | 8 |
|
7 | 9 | sdk_compute "github.com/databricks/databricks-sdk-go/service/compute" |
| 10 | + "github.com/databricks/databricks-sdk-go/service/pipelines" |
| 11 | + "github.com/databricks/terraform-provider-databricks/common" |
| 12 | + tf_dlt "github.com/databricks/terraform-provider-databricks/pipelines" |
| 13 | + tf_workspace "github.com/databricks/terraform-provider-databricks/workspace" |
| 14 | + "github.com/hashicorp/terraform-plugin-sdk/v2/helper/schema" |
8 | 15 | ) |
9 | 16 |
|
10 | 17 | func listClusters(ic *importContext) error { |
@@ -93,3 +100,348 @@ func (ic *importContext) importCluster(c *sdk_compute.ClusterSpec) { |
93 | 100 | }) |
94 | 101 | } |
95 | 102 | } |
| 103 | + |
| 104 | +func listInstancePools(ic *importContext) error { |
| 105 | + it := ic.workspaceClient.InstancePools.List(ic.Context) |
| 106 | + i := 0 |
| 107 | + for it.HasNext(ic.Context) { |
| 108 | + pool, err := it.Next(ic.Context) |
| 109 | + if err != nil { |
| 110 | + return err |
| 111 | + } |
| 112 | + i++ |
| 113 | + if !ic.MatchesName(pool.InstancePoolName) { |
| 114 | + continue |
| 115 | + } |
| 116 | + ic.Emit(&resource{ |
| 117 | + Resource: "databricks_instance_pool", |
| 118 | + ID: pool.InstancePoolId, |
| 119 | + }) |
| 120 | + if i%50 == 0 { |
| 121 | + log.Printf("[INFO] Imported %d instance pools", i) |
| 122 | + } |
| 123 | + } |
| 124 | + return nil |
| 125 | +} |
| 126 | + |
| 127 | +func instancePoolName(ic *importContext, d *schema.ResourceData) string { |
| 128 | + raw, ok := d.GetOk("instance_pool_name") |
| 129 | + if !ok || raw.(string) == "" { |
| 130 | + return strings.Split(d.Id(), "-")[2] |
| 131 | + } |
| 132 | + return raw.(string) |
| 133 | +} |
| 134 | + |
| 135 | +func importInstancePool(ic *importContext, r *resource) error { |
| 136 | + ic.emitPermissionsIfNotIgnored(r, fmt.Sprintf("/instance-pools/%s", r.ID), |
| 137 | + "inst_pool_"+ic.Importables["databricks_instance_pool"].Name(ic, r.Data)) |
| 138 | + return nil |
| 139 | +} |
| 140 | + |
| 141 | +func importCluster(ic *importContext, r *resource) error { |
| 142 | + var c sdk_compute.ClusterSpec |
| 143 | + s := ic.Resources["databricks_cluster"].Schema |
| 144 | + common.DataToStructPointer(r.Data, s, &c) |
| 145 | + ic.importCluster(&c) |
| 146 | + ic.emitPermissionsIfNotIgnored(r, fmt.Sprintf("/clusters/%s", r.ID), |
| 147 | + "cluster_"+ic.Importables["databricks_cluster"].Name(ic, r.Data)) |
| 148 | + return ic.importClusterLibraries(r.Data) |
| 149 | +} |
| 150 | + |
| 151 | +func listClusterPolicies(ic *importContext) error { |
| 152 | + builtInClusterPolicies := ic.getBuiltinPolicyFamilies() |
| 153 | + it := ic.workspaceClient.ClusterPolicies.List(ic.Context, sdk_compute.ListClusterPoliciesRequest{}) |
| 154 | + i := 0 |
| 155 | + for it.HasNext(ic.Context) { |
| 156 | + policy, err := it.Next(ic.Context) |
| 157 | + if err != nil { |
| 158 | + return err |
| 159 | + } |
| 160 | + i++ |
| 161 | + family, isBuiltin := builtInClusterPolicies[policy.PolicyFamilyId] |
| 162 | + if policy.PolicyFamilyId != "" && isBuiltin && family.Name == policy.Name && |
| 163 | + policy.PolicyFamilyDefinitionOverrides == "" { |
| 164 | + log.Printf("[DEBUG] Skipping builtin cluster policy '%s' without overrides", policy.Name) |
| 165 | + continue |
| 166 | + } |
| 167 | + if !ic.MatchesName(policy.Name) { |
| 168 | + log.Printf("[DEBUG] Policy %s doesn't match %s filter", policy.Name, ic.match) |
| 169 | + continue |
| 170 | + } |
| 171 | + ic.Emit(&resource{ |
| 172 | + Resource: "databricks_cluster_policy", |
| 173 | + ID: policy.PolicyId, |
| 174 | + }) |
| 175 | + if i%10 == 0 { |
| 176 | + log.Printf("[INFO] Scanned %d cluster policies", i) |
| 177 | + } |
| 178 | + } |
| 179 | + return nil |
| 180 | +} |
| 181 | + |
| 182 | +func importClusterPolicy(ic *importContext, r *resource) error { |
| 183 | + ic.emitPermissionsIfNotIgnored(r, fmt.Sprintf("/cluster-policies/%s", r.ID), |
| 184 | + "cluster_policy_"+ic.Importables["databricks_cluster_policy"].Name(ic, r.Data)) |
| 185 | + |
| 186 | + var clusterPolicy sdk_compute.Policy |
| 187 | + s := ic.Resources["databricks_cluster_policy"].Schema |
| 188 | + common.DataToStructPointer(r.Data, s, &clusterPolicy) |
| 189 | + |
| 190 | + var definition map[string]map[string]any |
| 191 | + err := json.Unmarshal([]byte(clusterPolicy.Definition), &definition) |
| 192 | + if err != nil { |
| 193 | + return err |
| 194 | + } |
| 195 | + for k, policy := range definition { |
| 196 | + value, vok := policy["value"] |
| 197 | + defaultValue, dok := policy["defaultValue"] |
| 198 | + typ := policy["type"] |
| 199 | + if !vok && !dok { |
| 200 | + log.Printf("[DEBUG] Skipping policy element as it doesn't have both value and defaultValue. k='%v', policy='%v'", |
| 201 | + k, policy) |
| 202 | + continue |
| 203 | + } |
| 204 | + if k == "aws_attributes.instance_profile_arn" { |
| 205 | + ic.Emit(&resource{ |
| 206 | + Resource: "databricks_instance_profile", |
| 207 | + ID: eitherString(value, defaultValue), |
| 208 | + }) |
| 209 | + } |
| 210 | + if k == "instance_pool_id" || k == "driver_instance_pool_id" { |
| 211 | + ic.Emit(&resource{ |
| 212 | + Resource: "databricks_instance_pool", |
| 213 | + ID: eitherString(value, defaultValue), |
| 214 | + }) |
| 215 | + } |
| 216 | + if typ == "fixed" && strings.HasPrefix(k, "init_scripts.") && |
| 217 | + strings.HasSuffix(k, ".dbfs.destination") { |
| 218 | + ic.emitIfDbfsFile(eitherString(value, defaultValue)) |
| 219 | + } |
| 220 | + if typ == "fixed" && strings.HasPrefix(k, "init_scripts.") && |
| 221 | + strings.HasSuffix(k, ".volumes.destination") { |
| 222 | + ic.emitIfVolumeFile(eitherString(value, defaultValue)) |
| 223 | + } |
| 224 | + if typ == "fixed" && strings.HasPrefix(k, "init_scripts.") && |
| 225 | + strings.HasSuffix(k, ".workspace.destination") { |
| 226 | + ic.emitWorkspaceFileOrRepo(eitherString(value, defaultValue)) |
| 227 | + } |
| 228 | + if typ == "fixed" && (strings.HasPrefix(k, "spark_conf.") || strings.HasPrefix(k, "spark_env_vars.")) { |
| 229 | + ic.emitSecretsFromSecretPathString(eitherString(value, defaultValue)) |
| 230 | + } |
| 231 | + } |
| 232 | + |
| 233 | + for _, lib := range clusterPolicy.Libraries { |
| 234 | + ic.emitIfDbfsFile(lib.Whl) |
| 235 | + ic.emitIfDbfsFile(lib.Jar) |
| 236 | + ic.emitIfDbfsFile(lib.Egg) |
| 237 | + ic.emitIfWsfsFile(lib.Whl) |
| 238 | + ic.emitIfWsfsFile(lib.Jar) |
| 239 | + ic.emitIfWsfsFile(lib.Egg) |
| 240 | + ic.emitIfVolumeFile(lib.Whl) |
| 241 | + ic.emitIfVolumeFile(lib.Jar) |
| 242 | + } |
| 243 | + |
| 244 | + policyFamilyId := clusterPolicy.PolicyFamilyId |
| 245 | + if policyFamilyId != "" && clusterPolicy.Definition != "" { |
| 246 | + // we need to set definition to empty value because otherwise it will be put into |
| 247 | + // generated HCL code for data source, and it only supports the `name` attribute |
| 248 | + r.Data.Set("definition", "") |
| 249 | + builtInClusterPolicies := ic.getBuiltinPolicyFamilies() |
| 250 | + v, isBuiltin := builtInClusterPolicies[policyFamilyId] |
| 251 | + if isBuiltin && clusterPolicy.PolicyFamilyDefinitionOverrides == "" && v.Name == clusterPolicy.Name { |
| 252 | + r.Mode = "data" |
| 253 | + } |
| 254 | + } |
| 255 | + |
| 256 | + return nil |
| 257 | +} |
| 258 | + |
| 259 | +func listPipelines(ic *importContext) error { |
| 260 | + it := ic.workspaceClient.Pipelines.ListPipelines(ic.Context, pipelines.ListPipelinesRequest{ |
| 261 | + MaxResults: 100, |
| 262 | + }) |
| 263 | + i := 0 |
| 264 | + for it.HasNext(ic.Context) { |
| 265 | + q, err := it.Next(ic.Context) |
| 266 | + if err != nil { |
| 267 | + return err |
| 268 | + } |
| 269 | + i++ |
| 270 | + if !ic.MatchesName(q.Name) { |
| 271 | + continue |
| 272 | + } |
| 273 | + var modifiedAt int64 |
| 274 | + if ic.incremental { |
| 275 | + pipeline, err := ic.workspaceClient.Pipelines.Get(ic.Context, pipelines.GetPipelineRequest{ |
| 276 | + PipelineId: q.PipelineId, |
| 277 | + }) |
| 278 | + if err != nil { |
| 279 | + return err |
| 280 | + } |
| 281 | + modifiedAt = pipeline.LastModified |
| 282 | + } |
| 283 | + ic.EmitIfUpdatedAfterMillis(&resource{ |
| 284 | + Resource: "databricks_pipeline", |
| 285 | + ID: q.PipelineId, |
| 286 | + }, modifiedAt, fmt.Sprintf("DLT Pipeline '%s'", q.Name)) |
| 287 | + if i%100 == 0 { |
| 288 | + log.Printf("[INFO] Imported %d DLT Pipelines", i) |
| 289 | + } |
| 290 | + } |
| 291 | + log.Printf("[INFO] Listed %d DLT pipelines", i) |
| 292 | + return nil |
| 293 | +} |
| 294 | + |
| 295 | +func importPipeline(ic *importContext, r *resource) error { |
| 296 | + var pipeline tf_dlt.Pipeline |
| 297 | + s := ic.Resources["databricks_pipeline"].Schema |
| 298 | + common.DataToStructPointer(r.Data, s, &pipeline) |
| 299 | + if pipeline.Deployment != nil && pipeline.Deployment.Kind == "BUNDLE" { |
| 300 | + log.Printf("[INFO] Skipping processing of DLT Pipeline with ID %s (%s) as deployed with DABs", |
| 301 | + r.ID, pipeline.Name) |
| 302 | + return nil |
| 303 | + } |
| 304 | + if pipeline.Catalog != "" { |
| 305 | + var schemaName string |
| 306 | + if pipeline.Target != "" { |
| 307 | + schemaName = pipeline.Target |
| 308 | + } else if pipeline.Schema != "" { |
| 309 | + schemaName = pipeline.Schema |
| 310 | + } |
| 311 | + if schemaName != "" { |
| 312 | + ic.Emit(&resource{ |
| 313 | + Resource: "databricks_schema", |
| 314 | + ID: pipeline.Catalog + "." + pipeline.Target, |
| 315 | + }) |
| 316 | + } |
| 317 | + } |
| 318 | + if pipeline.Deployment == nil || pipeline.Deployment.Kind != "BUNDLE" { |
| 319 | + nbAPI := tf_workspace.NewNotebooksAPI(ic.Context, ic.Client) |
| 320 | + for _, lib := range pipeline.Libraries { |
| 321 | + if lib.Notebook != nil { |
| 322 | + ic.emitNotebookOrRepo(lib.Notebook.Path) |
| 323 | + } |
| 324 | + if lib.File != nil { |
| 325 | + ic.emitWorkspaceFileOrRepo(lib.File.Path) |
| 326 | + } |
| 327 | + if lib.Glob != nil { |
| 328 | + if strings.HasSuffix(lib.Glob.Include, "/**") { |
| 329 | + // Emit all files and notebooks under the directory |
| 330 | + dirPath := strings.TrimSuffix(lib.Glob.Include, "/**") |
| 331 | + ic.emitDirectoryOrRepo(dirPath) |
| 332 | + objects, err := nbAPI.List(dirPath, false, true) |
| 333 | + if err == nil { |
| 334 | + for _, object := range objects { |
| 335 | + switch object.ObjectType { |
| 336 | + case tf_workspace.File: |
| 337 | + ic.emitWorkspaceFileOrRepo(object.Path) |
| 338 | + case tf_workspace.Notebook: |
| 339 | + ic.emitNotebookOrRepo(object.Path) |
| 340 | + } |
| 341 | + } |
| 342 | + } else { |
| 343 | + log.Printf("[WARN] Can't list directory %s for DLT pipeline %s", lib.Glob.Include, pipeline.Name) |
| 344 | + } |
| 345 | + } else { |
| 346 | + // Perform get-status call to check what is the object type |
| 347 | + object, err := nbAPI.GetStatus(lib.Glob.Include, false) |
| 348 | + if err == nil { |
| 349 | + switch object.ObjectType { |
| 350 | + case tf_workspace.File: |
| 351 | + ic.emitWorkspaceFileOrRepo(lib.Glob.Include) |
| 352 | + case tf_workspace.Notebook: |
| 353 | + ic.emitNotebookOrRepo(lib.Glob.Include) |
| 354 | + } |
| 355 | + } else { |
| 356 | + log.Printf("[WARN] Can't get status of %s for DLT pipeline %s", lib.Glob.Include, pipeline.Name) |
| 357 | + } |
| 358 | + } |
| 359 | + } |
| 360 | + ic.emitIfDbfsFile(lib.Jar) |
| 361 | + ic.emitIfDbfsFile(lib.Whl) |
| 362 | + } |
| 363 | + if pipeline.RootPath != "" { |
| 364 | + ic.emitDirectoryOrRepo(pipeline.RootPath) |
| 365 | + } |
| 366 | + } |
| 367 | + // Emit clusters |
| 368 | + for _, cluster := range pipeline.Clusters { |
| 369 | + if cluster.AwsAttributes != nil && cluster.AwsAttributes.InstanceProfileArn != "" { |
| 370 | + ic.Emit(&resource{ |
| 371 | + Resource: "databricks_instance_profile", |
| 372 | + ID: cluster.AwsAttributes.InstanceProfileArn, |
| 373 | + }) |
| 374 | + } |
| 375 | + if cluster.InstancePoolId != "" { |
| 376 | + ic.Emit(&resource{ |
| 377 | + Resource: "databricks_instance_pool", |
| 378 | + ID: cluster.InstancePoolId, |
| 379 | + }) |
| 380 | + } |
| 381 | + if cluster.DriverInstancePoolId != "" { |
| 382 | + ic.Emit(&resource{ |
| 383 | + Resource: "databricks_instance_pool", |
| 384 | + ID: cluster.DriverInstancePoolId, |
| 385 | + }) |
| 386 | + } |
| 387 | + if cluster.PolicyId != "" { |
| 388 | + ic.Emit(&resource{ |
| 389 | + Resource: "databricks_cluster_policy", |
| 390 | + ID: cluster.PolicyId, |
| 391 | + }) |
| 392 | + } |
| 393 | + ic.emitInitScripts(cluster.InitScripts) |
| 394 | + ic.emitSecretsFromSecretsPathMap(cluster.SparkConf) |
| 395 | + ic.emitSecretsFromSecretsPathMap(cluster.SparkEnvVars) |
| 396 | + } |
| 397 | + ic.emitFilesFromMap(pipeline.Configuration) |
| 398 | + ic.emitSecretsFromSecretsPathMap(pipeline.Configuration) |
| 399 | + ic.emitPermissionsIfNotIgnored(r, fmt.Sprintf("/pipelines/%s", r.ID), |
| 400 | + "pipeline_"+ic.Importables["databricks_pipeline"].Name(ic, r.Data)) |
| 401 | + if pipeline.Notifications != nil { |
| 402 | + for _, n := range pipeline.Notifications { |
| 403 | + ic.emitListOfUsers(n.EmailRecipients) |
| 404 | + } |
| 405 | + } |
| 406 | + if pipeline.EventLog != nil { |
| 407 | + var catalogName, schemaName string |
| 408 | + if pipeline.EventLog.Catalog != "" { |
| 409 | + catalogName = pipeline.EventLog.Catalog |
| 410 | + } else { |
| 411 | + catalogName = pipeline.Catalog |
| 412 | + } |
| 413 | + if pipeline.EventLog.Schema != "" { |
| 414 | + schemaName = pipeline.EventLog.Schema |
| 415 | + } else { |
| 416 | + schemaName = pipeline.Schema |
| 417 | + } |
| 418 | + if catalogName != "" && schemaName != "" && pipeline.EventLog.Name != "" { |
| 419 | + ic.Emit(&resource{ |
| 420 | + Resource: "databricks_sql_table", |
| 421 | + ID: catalogName + "." + schemaName + "." + pipeline.EventLog.Name, |
| 422 | + }) |
| 423 | + } |
| 424 | + } |
| 425 | + if pipeline.RunAs != nil { |
| 426 | + if pipeline.RunAs.UserName != "" { |
| 427 | + ic.Emit(&resource{ |
| 428 | + Resource: "databricks_user", |
| 429 | + Attribute: "user_name", |
| 430 | + Value: pipeline.RunAs.UserName, |
| 431 | + }) |
| 432 | + } |
| 433 | + if pipeline.RunAs.ServicePrincipalName != "" { |
| 434 | + ic.Emit(&resource{ |
| 435 | + Resource: "databricks_service_principal", |
| 436 | + Attribute: "application_id", |
| 437 | + Value: pipeline.RunAs.ServicePrincipalName, |
| 438 | + }) |
| 439 | + } |
| 440 | + } |
| 441 | + if pipeline.Environment != nil { |
| 442 | + for _, dep := range pipeline.Environment.Dependencies { |
| 443 | + emitEnvironmentDependency(ic, dep) |
| 444 | + } |
| 445 | + } |
| 446 | + return nil |
| 447 | +} |
0 commit comments