Skip to content

Commit e2c34a6

Browse files
committed
Update comrade_abe.py
1 parent eef2b01 commit e2c34a6

1 file changed

Lines changed: 139 additions & 113 deletions

File tree

tools/comrade_abe.py

Lines changed: 139 additions & 113 deletions
Original file line numberDiff line numberDiff line change
@@ -2,13 +2,17 @@
22

33
import argparse
44
import ctypes
5+
import json
6+
import logging
57
import os
68
import sys
9+
import time
10+
from datetime import datetime
711
import comtypes
812
import comtypes.typeinfo
913
import comtypes.automation
1014
import winreg
11-
from dataclasses import dataclass, field
15+
from dataclasses import dataclass, field, asdict
1216
from typing import List, Dict, Any, Optional
1317

1418
try:
@@ -191,7 +195,8 @@ def format_guid_for_cpp(guid_str_or_obj):
191195

192196
class ComInterfaceAnalyzer:
193197
def __init__(self, executable_path=None, verbose=False, target_method_names=None,
194-
expected_decrypt_param_count=3, expected_encrypt_param_count=4):
198+
expected_decrypt_param_count=3, expected_encrypt_param_count=4,
199+
log_file=None):
195200
self.executable_path = executable_path
196201
self.args_verbose = verbose
197202
self.type_lib: Optional[comtypes.POINTER(
@@ -203,12 +208,37 @@ def __init__(self, executable_path=None, verbose=False, target_method_names=None
203208
"DecryptData", "EncryptData"]
204209
self.expected_param_counts = {
205210
"DecryptData": expected_decrypt_param_count, "EncryptData": expected_encrypt_param_count}
211+
212+
# Statistics tracking
213+
self.start_time = None
214+
self.interfaces_scanned = 0
215+
self.interfaces_abe_capable = 0
216+
217+
# Setup file logging if requested
218+
self.logger = None
219+
if log_file:
220+
self.logger = logging.getLogger('ComradeABE')
221+
self.logger.setLevel(logging.DEBUG if verbose else logging.INFO)
222+
handler = logging.FileHandler(log_file, encoding='utf-8')
223+
handler.setFormatter(logging.Formatter(
224+
'%(asctime)s - %(levelname)s - %(message)s',
225+
datefmt='%Y-%m-%d %H:%M:%S'
226+
))
227+
self.logger.addHandler(handler)
206228

207229
def _log(self, message: str, indent: int = 0, verbose_only: bool = False, status_emoji: Optional[str] = None):
208230
if verbose_only and not self.args_verbose:
209231
return
210232
prefix = f"{status_emoji} " if status_emoji else ""
211-
print(f"{' ' * indent}{prefix}{message}")
233+
formatted_msg = f"{' ' * indent}{prefix}{message}"
234+
print(formatted_msg)
235+
236+
# Also log to file if logger is set
237+
if self.logger:
238+
# Remove emoji for cleaner log file
239+
clean_msg = message
240+
level = logging.DEBUG if verbose_only else logging.INFO
241+
self.logger.log(level, clean_msg)
212242

213243
def load_type_library(self) -> bool:
214244
if not self.executable_path:
@@ -545,16 +575,23 @@ def analyze_interfaces_directly(self):
545575
self._log(
546576
f"{EMOJI_GEAR} Analyzing all TKIND_INTERFACE entries from TypeLib...", indent=1)
547577
self.results = []
578+
self.interfaces_scanned = 0
579+
self.interfaces_abe_capable = 0
548580

549581
num_type_infos = 0
550582
try:
551583
num_type_infos = self.type_lib.GetTypeInfoCount()
584+
self._log(f"{EMOJI_INFO} Found {num_type_infos} type definitions to scan", indent=1)
552585
except Exception as e_count:
553586
self._log(
554587
f"{EMOJI_FAILURE} Error getting TypeInfo count: {e_count}", indent=2)
555588
return
556589

557590
for i in range(num_type_infos):
591+
# Progress indicator (every 10%)
592+
if num_type_infos > 10 and i % max(1, num_type_infos // 10) == 0 and i > 0:
593+
progress_pct = (i / num_type_infos) * 100
594+
self._log(f"{EMOJI_SEARCH} Progress: {progress_pct:.0f}% ({i}/{num_type_infos} type definitions scanned)", indent=1)
558595
type_info_obj_main_iter = None
559596
attr_main_iter_ptr = None
560597
interface_name_for_log = f"TypeInfo index {i}"
@@ -574,6 +611,7 @@ def analyze_interfaces_directly(self):
574611
if not attr_main_iter_ptr or attr_main_iter_ptr.typekind != comtypes.typeinfo.TKIND_INTERFACE:
575612
continue
576613

614+
self.interfaces_scanned += 1
577615
interface_iid_str = str(attr_main_iter_ptr.guid)
578616
self._log(
579617
f"Scanning Interface: '{interface_name_for_log}' (IID: {interface_iid_str})", indent=2, verbose_only=True)
@@ -614,6 +652,7 @@ def analyze_interfaces_directly(self):
614652
pass
615653

616654
if all(name in methods_found_in_chain for name in self.target_method_names):
655+
self.interfaces_abe_capable += 1
617656
self._log(
618657
f"{EMOJI_INFO} Found ABE-capable: '{interface_name_for_log}' (IID: {interface_iid_str})", indent=3)
619658
self.results.append(AbeCandidate(
@@ -648,112 +687,9 @@ def analyze_interfaces_directly(self):
648687
self._log(
649688
f"{EMOJI_INFO} No ABE-capable interfaces were found after scanning all TypeInfo entries.", indent=1)
650689

651-
def analyze_interfaces_directly(self):
652-
if not self.type_lib:
653-
self._log(f"{EMOJI_FAILURE} Type library not loaded. Cannot analyze interfaces.",
654-
status_emoji=EMOJI_FAILURE)
655-
return
656-
657-
self._log(
658-
f"{EMOJI_GEAR} Analyzing all TKIND_INTERFACE entries from TypeLib...", indent=1)
659-
self.results = []
660-
661-
num_type_infos = 0
662-
try:
663-
num_type_infos = self.type_lib.GetTypeInfoCount()
664-
except Exception as e_count:
665-
self._log(
666-
f"{EMOJI_FAILURE} Error getting TypeInfo count: {e_count}", indent=2)
667-
return
668-
669-
for i in range(num_type_infos):
670-
type_info = attr = None
671-
interface_name_for_log = f"TypeInfo index {i}"
672-
try:
673-
type_info = self.type_lib.GetTypeInfo(i)
674-
attr = type_info.GetTypeAttr()
675-
676-
try:
677-
interface_name_for_log, _, _, _ = type_info.GetDocumentation(
678-
-1)
679-
except Exception:
680-
pass
681-
682-
if not attr or attr.typekind != comtypes.typeinfo.TKIND_INTERFACE:
683-
continue
684-
685-
interface_iid_str = str(attr.guid)
686-
self._log(
687-
f"Scanning Interface: '{interface_name_for_log}' (IID: {interface_iid_str})", indent=2, verbose_only=True)
688-
689-
current_interface_chain = self.get_inheritance_chain(type_info)
690-
methods_found_in_chain = {}
691-
692-
for iface_in_chain_info in current_interface_chain:
693-
for method_detail in iface_in_chain_info.methods_defined:
694-
method_name = method_detail.name
695-
if method_name in self.target_method_names and method_name not in methods_found_in_chain:
696-
func_desc_check = None
697-
try:
698-
func_desc_check = iface_in_chain_info.type_info_obj.GetFuncDesc(
699-
method_detail.index_in_interface)
700-
if func_desc_check and \
701-
func_desc_check.cParams == self.expected_param_counts.get(method_name, -1) and \
702-
self.check_method_signature(method_name, func_desc_check, iface_in_chain_info.type_info_obj):
703-
methods_found_in_chain[method_name] = AnalyzedMethod(
704-
name=method_name, ovft=method_detail.ovft, memid=method_detail.memid,
705-
defining_interface_name=iface_in_chain_info.name,
706-
defining_interface_iid=iface_in_chain_info.iid
707-
)
708-
self._log(f"'{method_name}' matched signature in '{iface_in_chain_info.name}'.",
709-
indent=4, verbose_only=True, status_emoji=EMOJI_LIGHTBULB)
710-
except comtypes.COMError as e_fd_check:
711-
self._log(
712-
f"{EMOJI_WARNING} COMError checking method '{method_name}' in '{iface_in_chain_info.name}': {e_fd_check}", indent=5, verbose_only=True)
713-
finally:
714-
if func_desc_check and iface_in_chain_info.type_info_obj:
715-
try:
716-
iface_in_chain_info.type_info_obj.ReleaseFuncDesc(
717-
func_desc_check)
718-
except:
719-
pass # Best effort
720-
721-
if all(name in methods_found_in_chain for name in self.target_method_names):
722-
self._log(f"Interface '{interface_name_for_log}' (IID: {interface_iid_str}) IS ABE-capable.",
723-
indent=3, verbose_only=True, status_emoji=EMOJI_SUCCESS)
724-
self.results.append(AbeCandidate(
725-
clsid=self.discovered_clsid or "Unknown CLSID",
726-
interface_name=interface_name_for_log,
727-
interface_iid=interface_iid_str,
728-
methods=methods_found_in_chain,
729-
inheritance_chain_info=current_interface_chain
730-
))
731-
else:
732-
self._log(
733-
f"Interface '{interface_name_for_log}' did not meet all target method criteria.", indent=3, verbose_only=True)
734-
735-
except comtypes.COMError as e_com_loop:
736-
self._log(
737-
f"{EMOJI_FAILURE} COMError processing {interface_name_for_log}: {e_com_loop}", indent=2)
738-
except Exception as e_gen_loop:
739-
self._log(
740-
f"{EMOJI_FAILURE} Generic error processing {interface_name_for_log}: {e_gen_loop}", indent=2)
741-
import traceback
742-
if self.args_verbose:
743-
traceback.print_exc()
744-
finally:
745-
if attr and type_info:
746-
try:
747-
type_info.ReleaseTypeAttr(attr)
748-
except Exception as e_release_loop_attr:
749-
self._log(
750-
f"{EMOJI_WARNING} Error releasing TYPEATTR for '{interface_name_for_log}': {e_release_loop_attr}", indent=3, verbose_only=True)
751-
752-
if not self.results:
753-
self._log(
754-
f"{EMOJI_INFO} No ABE-capable interfaces were found or added to results after scanning all TypeInfo entries.", indent=1)
755690

756691
def analyze(self, scan_mode=False, browser_key_for_scan=None, user_provided_clsid=None):
692+
self.start_time = time.time()
757693
comtypes.CoInitialize()
758694
try:
759695
if scan_mode and browser_key_for_scan:
@@ -785,6 +721,72 @@ def analyze(self, scan_mode=False, browser_key_for_scan=None, user_provided_clsi
785721
self.analyze_interfaces_directly()
786722
finally:
787723
comtypes.CoUninitialize()
724+
725+
def export_to_json(self, output_file: str) -> bool:
726+
"""Export analysis results to JSON format."""
727+
if not self.results:
728+
self._log(f"{EMOJI_FAILURE} No results to export to JSON.", status_emoji=EMOJI_FAILURE)
729+
return False
730+
731+
try:
732+
export_data = {
733+
"metadata": {
734+
"tool": "COMrade ABE Analyzer",
735+
"version": "1.1.0",
736+
"timestamp": datetime.now().isoformat(),
737+
"analysis_duration_seconds": time.time() - self.start_time if self.start_time else 0,
738+
"browser": self.browser_key or "unknown",
739+
"executable_path": self.executable_path or "unknown"
740+
},
741+
"statistics": {
742+
"total_interfaces_scanned": self.interfaces_scanned,
743+
"abe_capable_interfaces_found": self.interfaces_abe_capable,
744+
"target_methods": self.target_method_names
745+
},
746+
"discovered_clsid": self.discovered_clsid or "Unknown",
747+
"results": []
748+
}
749+
750+
for candidate in self.results:
751+
result_entry = {
752+
"interface_name": candidate.interface_name,
753+
"interface_iid": candidate.interface_iid,
754+
"clsid": candidate.clsid,
755+
"methods": {},
756+
"inheritance_chain": []
757+
}
758+
759+
# Add methods
760+
for method_name, method_detail in candidate.methods.items():
761+
result_entry["methods"][method_name] = {
762+
"name": method_detail.name,
763+
"vtable_offset": method_detail.ovft,
764+
"memid": method_detail.memid,
765+
"defining_interface": method_detail.defining_interface_name,
766+
"defining_interface_iid": method_detail.defining_interface_iid
767+
}
768+
769+
# Add inheritance chain
770+
for iface_info in candidate.inheritance_chain_info:
771+
chain_entry = {
772+
"name": iface_info.name,
773+
"iid": iface_info.iid,
774+
"base_interface": iface_info.base_interface_name,
775+
"methods_count": len(iface_info.methods_defined)
776+
}
777+
result_entry["inheritance_chain"].append(chain_entry)
778+
779+
export_data["results"].append(result_entry)
780+
781+
with open(output_file, 'w', encoding='utf-8') as f:
782+
json.dump(export_data, f, indent=2, ensure_ascii=False)
783+
784+
self._log(f"{EMOJI_SUCCESS} JSON export saved to: {output_file}")
785+
return True
786+
787+
except Exception as e:
788+
self._log(f"{EMOJI_FAILURE} Error exporting JSON: {e}", status_emoji=EMOJI_FAILURE)
789+
return False
788790

789791
def generate_cpp_stub_for_chain(self, chain_info_list: List[InterfaceInfo], main_abe_interface_iid: str) -> str:
790792
output_cpp = ""
@@ -914,6 +916,17 @@ def print_results(self, output_cpp_stub_file=None):
914916
print(f" Discovered CLSID : {common_clsid_str}")
915917
if common_clsid_cpp != format_guid_for_cpp(None):
916918
print(f" (C++ Style) : {common_clsid_cpp}")
919+
920+
# Show statistics
921+
if self.start_time:
922+
duration = time.time() - self.start_time
923+
print(f"\n {EMOJI_GEAR} Statistics:")
924+
print(f" Analysis Duration : {duration:.2f} seconds")
925+
print(f" Interfaces Scanned: {self.interfaces_scanned}")
926+
print(f" ABE-Capable Found : {self.interfaces_abe_capable}")
927+
if self.interfaces_scanned > 0:
928+
success_rate = (self.interfaces_abe_capable / self.interfaces_scanned) * 100
929+
print(f" Success Rate : {success_rate:.1f}%")
917930

918931
print(f"\n Found {len(self.results)} ABE-Capable Interface(s):")
919932
chrome_known_good_iid = "{463ABECF-410D-407F-8AF5-0DF35A005CC8}".lower()
@@ -1088,6 +1101,16 @@ def print_banner():
10881101
help="Enable scan mode. In this mode, TARGET should be a browser key ('chrome', 'edge', 'brave').\n"
10891102
"The script will attempt to find the service executable and CLSID from the registry."
10901103
)
1104+
parser.add_argument(
1105+
"--output-json",
1106+
metavar="FILE_PATH",
1107+
help="Export analysis results to JSON format for programmatic consumption."
1108+
)
1109+
parser.add_argument(
1110+
"--log-file",
1111+
metavar="FILE_PATH",
1112+
help="Write detailed logs to the specified file with timestamps."
1113+
)
10911114

10921115
if len(sys.argv) == 1:
10931116
parser.print_help(sys.stderr)
@@ -1105,7 +1128,8 @@ def print_banner():
11051128
verbose=args.verbose,
11061129
target_method_names=[name.strip() for name in args.target_method_names.split(',')],
11071130
expected_decrypt_param_count=args.decrypt_params,
1108-
expected_encrypt_param_count=args.encrypt_params
1131+
expected_encrypt_param_count=args.encrypt_params,
1132+
log_file=args.log_file
11091133
)
11101134

11111135
if args.scan:
@@ -1118,12 +1142,14 @@ def print_banner():
11181142
parser.error(f"Executable path not found: {args.executable_path_or_browser_key}")
11191143
analyzer.executable_path = args.executable_path_or_browser_key
11201144
if args.known_clsid:
1121-
analyzer.discovered_clsid = args.known_clsid
1122-
analyzer.browser_key = "manual_path_input"
1145+
analyzer.discovered_clsid = args.known_clsid
1146+
analyzer.browser_key = "manual_path_input"
11231147
analyzer.analyze(scan_mode=False, user_provided_clsid=args.known_clsid)
11241148

1125-
if analyzer.results or args.verbose:
1126-
print(f"{EMOJI_INFO} Debug: analyzer.results has {len(analyzer.results)} items before printing.")
1127-
11281149
analyzer.print_results(output_cpp_stub_file=args.output_cpp_stub)
1150+
1151+
# Export to JSON if requested
1152+
if args.output_json and analyzer.results:
1153+
analyzer.export_to_json(args.output_json)
1154+
11291155
print(f"\n{EMOJI_SUCCESS} Analysis complete.")

0 commit comments

Comments
 (0)