1
1
import logging
2
- from typing import Dict , Any
2
+ from typing import Dict , Any , List , Optional , Union
3
3
4
4
from client import get_atlan_client
5
5
from pyatlan .model .enums import LineageDirection
6
6
from pyatlan .model .lineage import FluentLineage
7
+ from pyatlan .model .fields .atlan_fields import AtlanField
8
+ from utils .search import SearchUtils
9
+ from utils .constants import DEFAULT_SEARCH_ATTRIBUTES
7
10
8
11
# Configure logging
9
12
logger = logging .getLogger (__name__ )
@@ -15,61 +18,97 @@ def traverse_lineage(
15
18
depth : int = 1000000 ,
16
19
size : int = 10 ,
17
20
immediate_neighbors : bool = False ,
21
+ include_attributes : Optional [List [Union [str , AtlanField ]]] = None ,
18
22
) -> Dict [str , Any ]:
19
23
"""
20
24
Traverse asset lineage in specified direction.
21
25
26
+ By default, essential attributes used in search operations are included.
27
+ Additional attributes can be specified via include_attributes parameter.
28
+
22
29
Args:
23
30
guid (str): GUID of the starting asset
24
31
direction (LineageDirection): Direction to traverse (UPSTREAM or DOWNSTREAM)
25
32
depth (int, optional): Maximum depth to traverse. Defaults to 1000000.
26
33
size (int, optional): Maximum number of results to return. Defaults to 10.
27
- immediate_neighbors (bool, optional): Only return immediate neighbors. Defaults to True.
34
+ immediate_neighbors (bool, optional): Only return immediate neighbors. Defaults to False.
35
+ include_attributes (Optional[List[Union[str, AtlanField]]], optional): List of additional
36
+ attributes to include in results. Can be string attribute names or AtlanField objects.
37
+ These will be added to the default set. Defaults to None.
28
38
29
39
Returns:
30
40
Dict[str, Any]: Dictionary containing:
31
- - assets: List of assets in the lineage
32
- - references: List of dictionaries containing:
33
- - source_guid: GUID of the source asset
34
- - target_guid: GUID of the target asset
35
- - direction: Direction of the reference (upstream/downstream)
41
+ - assets: List of assets in the lineage with processed attributes
42
+ - error: None if no error occurred, otherwise the error message
36
43
37
44
Raises:
38
45
Exception: If there's an error executing the lineage request
39
46
"""
40
- logger .info (f"Starting lineage traversal from { guid } in direction { direction } " )
47
+ logger .info (
48
+ f"Starting lineage traversal from { guid } in direction { direction } , "
49
+ f"depth={ depth } , size={ size } , immediate_neighbors={ immediate_neighbors } "
50
+ )
51
+ logger .debug (f"Include attributes parameter: { include_attributes } " )
41
52
42
53
try :
43
54
# Initialize base request
44
- request = (
55
+ logger .debug ("Initializing FluentLineage object" )
56
+ lineage_builder = (
45
57
FluentLineage (starting_guid = guid )
46
58
.direction (direction )
47
59
.depth (depth )
48
60
.size (size )
49
61
.immediate_neighbors (immediate_neighbors )
50
- .request
51
62
)
52
63
64
+ # Prepare attributes to include: default attributes + additional user-specified attributes
65
+ all_attributes = DEFAULT_SEARCH_ATTRIBUTES .copy ()
66
+
67
+ if include_attributes :
68
+ logger .debug (f"Adding user-specified attributes: { include_attributes } " )
69
+ for attr in include_attributes :
70
+ if isinstance (attr , str ) and attr not in all_attributes :
71
+ all_attributes .append (attr )
72
+
73
+ logger .debug (f"Total attributes to include: { all_attributes } " )
74
+
75
+ # Include all string attributes in results
76
+ for attr_name in all_attributes :
77
+ attr_obj = SearchUtils ._get_asset_attribute (attr_name )
78
+ if attr_obj is None :
79
+ logger .warning (
80
+ f"Unknown attribute for inclusion: { attr_name } , skipping"
81
+ )
82
+ continue
83
+ logger .debug (f"Including attribute: { attr_name } " )
84
+ lineage_builder = lineage_builder .include_on_results (attr_obj )
85
+
53
86
# Execute request
54
- logger .debug ("Executing lineage request" )
55
- client = get_atlan_client ()
56
- response = client .asset .get_lineage_list (request ) # noqa: F821
87
+ logger .debug ("Converting FluentLineage to request object" )
88
+ request = lineage_builder .request
57
89
58
- # Process results
59
- result = {"assets" : []}
90
+ logger .info ("Executing lineage request" )
91
+ client = get_atlan_client ()
92
+ response = client .asset .get_lineage_list (request )
60
93
61
- # Handle None response
94
+ # Process results using same pattern as search
95
+ logger .info ("Processing lineage results" )
62
96
if response is None :
63
97
logger .info ("No lineage results found" )
64
- return result
98
+ return {"assets" : [], "error" : None }
99
+
100
+ # Convert results to list and process using Pydantic serialization
101
+ results_list = [
102
+ result .dict (by_alias = True , exclude_unset = True )
103
+ for result in response
104
+ if result is not None
105
+ ]
106
+
107
+ logger .info (
108
+ f"Lineage traversal completed, returned { len (results_list )} results"
109
+ )
110
+ return {"assets" : results_list , "error" : None }
65
111
66
- assets = []
67
- for item in response :
68
- if item is None :
69
- continue
70
- assets .append (item )
71
- result ["assets" ] = assets
72
- return result
73
112
except Exception as e :
74
113
logger .error (f"Error traversing lineage: { str (e )} " )
75
114
return {"assets" : [], "error" : str (e )}
0 commit comments