|
1 | 1 | import logging |
2 | 2 | from datetime import date, timedelta |
3 | 3 | from typing import Dict |
4 | | -from urllib.parse import urlparse |
5 | 4 |
|
6 | 5 | import requests |
7 | 6 | from faker import Faker # skipcq: BAN-B410 |
8 | 7 | from lxml.etree import HTMLParser # skipcq: BAN-B410 |
9 | 8 | from lxml.html import document_fromstring |
10 | 9 | from requests import HTTPError, Response |
| 10 | +from requests.exceptions import MissingSchema |
11 | 11 |
|
12 | 12 | from api_app.analyzers_manager.classes import FileAnalyzer |
13 | 13 | from api_app.models import PythonConfig |
@@ -138,25 +138,33 @@ def identify_text_input(self, input_name: str) -> str: |
138 | 138 | return fake_value |
139 | 139 |
|
140 | 140 | def extract_action_attribute(self, form) -> str: |
141 | | - if not (form_action := form.get("action", None)): |
| 141 | + form_action: str = form.get("action", None) |
| 142 | + if not form_action: |
142 | 143 | logger.info( |
143 | 144 | f"'action' attribute not found in form. Defaulting to {self.target_site=}" |
144 | 145 | ) |
145 | 146 | form_action = self.target_site |
146 | | - |
147 | | - # if relative url extracted, clean it from '/' and concatenate everything |
148 | | - # if action was not extracted in previous step the if should not pass as it is a url |
149 | | - if not urlparse(form_action).netloc: |
| 147 | + elif form_action.startswith("/"): # pure relative url |
150 | 148 | logger.info(f"Found relative url in {form_action=}") |
| 149 | + form_action = form_action.replace("/", "", 1) |
151 | 150 | base_site = self.target_site |
| 151 | + |
152 | 152 | if base_site.endswith("/"): |
153 | 153 | base_site = base_site[:-1] |
154 | | - if form_action.startswith("/"): |
155 | | - form_action = form_action.replace("/", "", 1) |
| 154 | + form_action = base_site + "/" + form_action |
| 155 | + elif ( |
| 156 | + "." in form_action and "://" not in form_action |
| 157 | + ): # found a domain (relative file names such as "login.php" should start with /) |
| 158 | + logger.info(f"Found a domain in form action {form_action=}") |
| 159 | + else: |
| 160 | + base_site = self.target_site |
156 | 161 |
|
| 162 | + if base_site.endswith("/"): |
| 163 | + base_site = base_site[:-1] |
157 | 164 | form_action = base_site + "/" + form_action |
158 | 165 |
|
159 | 166 | logger.info(f"Extracted action to post data to: {form_action}") |
| 167 | + |
160 | 168 | return form_action |
161 | 169 |
|
162 | 170 | def compile_form_field(self, form) -> dict: |
@@ -200,16 +208,29 @@ def perform_request_to_form(self, form) -> Response: |
200 | 208 | headers = { |
201 | 209 | "User-Agent": self.user_agent, |
202 | 210 | } |
203 | | - response = requests.post( |
204 | | - url=dest_url, |
205 | | - data=params, |
206 | | - headers=headers, |
207 | | - proxies=( |
208 | | - {"http": self.proxy_address, "https": self.proxy_address} |
209 | | - if self.proxy_address |
210 | | - else None |
211 | | - ), |
212 | | - ) |
| 211 | + try: |
| 212 | + response = requests.post( |
| 213 | + url=dest_url, |
| 214 | + data=params, |
| 215 | + headers=headers, |
| 216 | + proxies=( |
| 217 | + {"http": self.proxy_address, "https": self.proxy_address} |
| 218 | + if self.proxy_address |
| 219 | + else None |
| 220 | + ), |
| 221 | + ) |
| 222 | + except MissingSchema: |
| 223 | + logger.info(f"Adding default 'https://' schema to {dest_url}") |
| 224 | + response = requests.post( |
| 225 | + url="https://" + dest_url, |
| 226 | + data=params, |
| 227 | + headers=headers, |
| 228 | + proxies=( |
| 229 | + {"http": self.proxy_address, "https": self.proxy_address} |
| 230 | + if self.proxy_address |
| 231 | + else None |
| 232 | + ), |
| 233 | + ) |
213 | 234 | logger.info(f"Request headers: {response.request.headers}") |
214 | 235 | return response |
215 | 236 |
|
|
0 commit comments