6
6
import os
7
7
import subprocess
8
8
import sys
9
+ from typing import Callable , Iterable , List , Union
9
10
import urllib .parse
10
11
import urllib .request
11
12
12
13
from sarif .sarif_file import SarifFileSet
13
14
14
15
16
+ def _run_git_blame (repo_path : str , file_path : str ) -> List [bytes ]:
17
+ cmd = ["git" , "blame" , "--porcelain" , _make_path_git_compatible (file_path )]
18
+ with subprocess .Popen (cmd , stdout = subprocess .PIPE , cwd = repo_path ) as proc :
19
+ result = []
20
+ if proc .stdout :
21
+ result = [x for x in proc .stdout .readlines ()]
22
+
23
+ # Ensure process terminates
24
+ proc .communicate ()
25
+ if proc .returncode :
26
+ cmd_str = " " .join (cmd )
27
+ sys .stderr .write (
28
+ f"WARNING: Command `{ cmd_str } "
29
+ f"failed with exit code { proc .returncode } in { repo_path } \n "
30
+ )
31
+
32
+ return result
33
+
34
+
15
35
def enhance_with_blame (
16
- input_files : SarifFileSet , repo_path : str , output : str , output_multiple_files : bool
36
+ input_files : SarifFileSet ,
37
+ repo_path : str ,
38
+ output : str ,
39
+ output_multiple_files : bool ,
40
+ run_git_blame : Callable [[str , str ], List [bytes ]] = _run_git_blame ,
17
41
):
18
42
"""
19
43
Enhance SARIF files with information from `git blame`. The `git` command is run in the current
@@ -26,7 +50,7 @@ def enhance_with_blame(
26
50
if not os .path .isdir (repo_path ):
27
51
raise ValueError (f"No git repository directory found at { repo_path } " )
28
52
29
- _enhance_with_blame (input_files , repo_path )
53
+ _enhance_with_blame (input_files , repo_path , run_git_blame )
30
54
31
55
for input_file in input_files :
32
56
input_file_name = input_file .get_file_name ()
@@ -57,7 +81,11 @@ def enhance_with_blame(
57
81
)
58
82
59
83
60
- def _enhance_with_blame (input_files , repo_path ):
84
+ def _enhance_with_blame (
85
+ input_files : SarifFileSet ,
86
+ repo_path : str ,
87
+ run_git_blame : Callable [[str , str ], List [bytes ]],
88
+ ):
61
89
"""
62
90
Run `git blame --porcelain` for each file path listed in input_files.
63
91
Then enhance the results in error_list by adding a "blame" property including "hash", "author"
@@ -73,7 +101,7 @@ def _enhance_with_blame(input_files, repo_path):
73
101
"in" ,
74
102
repo_path ,
75
103
)
76
- file_blame_info = _run_git_blame_on_files (files_to_blame , repo_path )
104
+ file_blame_info = _run_git_blame_on_files (files_to_blame , repo_path , run_git_blame )
77
105
78
106
# Now join up blame output with result list
79
107
blame_info_count = 0
@@ -106,44 +134,40 @@ def _make_path_git_compatible(file_path):
106
134
return file_path
107
135
108
136
109
- def _run_git_blame_on_files (files_to_blame , repo_path ):
137
+ def _run_git_blame_on_files (
138
+ files_to_blame : Iterable [str ],
139
+ repo_path : str ,
140
+ run_git_blame : Callable [[str , str ], List [bytes ]],
141
+ ):
110
142
file_blame_info = {}
111
143
for file_path in files_to_blame :
112
- cmd = ["git" , "blame" , "--porcelain" , _make_path_git_compatible (file_path )]
113
- with subprocess .Popen (cmd , stdout = subprocess .PIPE , cwd = repo_path ) as proc :
114
- blame_info = {"commits" : {}, "line_to_commit" : {}}
115
- file_blame_info [file_path ] = blame_info
116
- commit_hash : str | None = None
117
- for line_bytes in proc .stdout .readlines ():
118
- # Convert byte sequence to string and remove trailing LF
119
- line_string = line_bytes .decode ("utf-8" )[:- 1 ]
120
- # Now parse output from git blame --porcelain
121
- if commit_hash :
122
- if line_string .startswith ("\t " ):
123
- commit_hash = None
124
- # Ignore line contents = source code
125
- elif " " in line_string :
126
- space_pos = line_string .index (" " )
127
- key = line_string [0 :space_pos ]
128
- value = line_string [space_pos + 1 :].strip ()
129
- blame_info ["commits" ][commit_hash ][key ] = value
130
- else :
131
- # e.g. "boundary"
132
- key = line_string
133
- blame_info ["commits" ][commit_hash ][key ] = True
144
+ git_blame_output = run_git_blame (repo_path , file_path )
145
+ blame_info = {"commits" : {}, "line_to_commit" : {}}
146
+ file_blame_info [file_path ] = blame_info
147
+ commit_hash : Union [str , None ] = None
148
+
149
+ for line_bytes in git_blame_output :
150
+ # Convert byte sequence to string and remove trailing LF
151
+ line_string = line_bytes .decode ("utf-8" )[:- 1 ]
152
+ # Now parse output from git blame --porcelain
153
+ if commit_hash :
154
+ if line_string .startswith ("\t " ):
155
+ commit_hash = None
156
+ # Ignore line contents = source code
157
+ elif " " in line_string :
158
+ space_pos = line_string .index (" " )
159
+ key = line_string [0 :space_pos ]
160
+ value = line_string [space_pos + 1 :].strip ()
161
+ blame_info ["commits" ][commit_hash ][key ] = value
134
162
else :
135
- commit_line_info = line_string .split (" " )
136
- commit_hash = commit_line_info [0 ]
137
- commit_line = commit_line_info [2 ]
138
- blame_info ["commits" ].setdefault (commit_hash , {})
139
- blame_info ["line_to_commit" ][commit_line ] = commit_hash
140
-
141
- # Ensure process terminates
142
- proc .communicate ()
143
- if proc .returncode :
144
- cmd_str = " " .join (cmd )
145
- sys .stderr .write (
146
- f"WARNING: Command `{ cmd_str } "
147
- f"failed with exit code { proc .returncode } in { repo_path } \n "
148
- )
163
+ # e.g. "boundary"
164
+ key = line_string
165
+ blame_info ["commits" ][commit_hash ][key ] = True
166
+ else :
167
+ commit_line_info = line_string .split (" " )
168
+ commit_hash = commit_line_info [0 ]
169
+ commit_line = commit_line_info [2 ]
170
+ blame_info ["commits" ].setdefault (commit_hash , {})
171
+ blame_info ["line_to_commit" ][commit_line ] = commit_hash
172
+
149
173
return file_blame_info
0 commit comments