Skip to content
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
185 changes: 94 additions & 91 deletions api_app/analyzers_manager/file_analyzers/cape_sandbox.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@
from typing import Dict

import requests
from billiard.exceptions import SoftTimeLimitExceeded

from api_app.analyzers_manager.classes import FileAnalyzer
from api_app.analyzers_manager.exceptions import AnalyzerRunException
Expand Down Expand Up @@ -54,6 +55,10 @@ class ContinuePolling(Exception):
# CapeSandbox SSL certificate (multiline string).
_certificate: str

@classmethod
def update(cls) -> bool:
pass

@staticmethod
def _clean_certificate(cert):
return (
Expand Down Expand Up @@ -220,117 +225,115 @@ def __poll_for_result(
self,
task_id,
) -> dict:
# decreasing timeout:
# average analysis starting time -> the request is still made in case of mock
# real analysis duration with poll distance of 1 minute + the remaining module
# empirical value of processing duration time
# real polling with max tries
# final attempts in case there is a bottleneck
# decreasing timeout
timeout_attempts = (
[30]
+ [60] * (self.timeout // 60)
+ [self.timeout % 60]
+ [50]
+ [self.poll_distance] * self.max_tries
+ [60] * 3
[30] # avg time to start the machine and the analysis
+ [60] * (self.timeout // 60) # time of the analysis
+ [90] # avg time of processing time
+ [self.poll_distance] * self.max_tries # attempts in the best time window
+ [30] * 20 # exceed soft time limit in order to generate the error
)

tot_time = sum(timeout_attempts)
if tot_time > 600:
logger.warning(
f" Job: {self.job_id} -> "
"Broken soft time limit!! "
f"The analysis in the worst case will last {tot_time} seconds"
)

results = None
success = False
status_api = self._url_key_name + "/apiv2/tasks/status/" + str(task_id)
is_pending = True

while is_pending: # starts the for loop when we are not in pending state
is_pending = False # ends the timeouts loop but only this time
for try_, curr_timeout in enumerate(timeout_attempts):
attempt = try_ + 1
try:
logger.info(
f" Job: {self.job_id} -> "
f"Starting poll number #{attempt}/{len(timeout_attempts)}"
)

request = self.__single_poll(status_api)

# in case the request was ok
responded_json = request.json()
error = responded_json.get("error")
data = responded_json.get("data")

logger.info(
f"Job: {self.job_id} -> "
f"Status of the CAPESandbox task: {data}"
)

if error:
raise AnalyzerRunException(error)

if data == "pending":
is_pending = True
try:
while is_pending: # starts the for loop when we are not in pending state
is_pending = False # ends the timeouts loop but only this time
for try_, curr_timeout in enumerate(timeout_attempts):
attempt = try_ + 1
try:
logger.info(
f" Job: {self.job_id} -> "
"Waiting for the pending status to end, "
"sleeping for 15 seconds..."
)
time.sleep(15)
break

if data in ("running", "processing"):
raise self.ContinuePolling(f"Task still {data}")

if data in ("reported", "completed"):
report_url = (
self._url_key_name
+ "/apiv2/tasks/get/report/"
+ str(task_id)
+ "/litereport"
f"Starting poll number #{attempt}/{len(timeout_attempts)}"
)

results = self.__single_poll(report_url, polling=False).json()
request = self.__single_poll(status_api)

# the task was being processed
if (
"error" in results
and results["error"]
and results["error_value"] == "Task is still being analyzed"
):
raise self.ContinuePolling("Task still processing")
# in case the request was ok
responded_json = request.json()
error = responded_json.get("error")
data = responded_json.get("data")

logger.info(
f" Job: {self.job_id} ->"
f"Poll number #{attempt}/{len(timeout_attempts)} fetched"
" the results of the analysis."
" stopping polling.."
f"Job: {self.job_id} -> "
f"Status of the CAPESandbox task: {data}"
)
success = True

break

else:
raise AnalyzerRunException(
f"status {data} was unexpected. Check the code"
if error:
raise AnalyzerRunException(error)

if data == "pending":
is_pending = True
logger.info(
f" Job: {self.job_id} -> "
"Waiting for the pending status to end, "
"sleeping for 15 seconds..."
)
time.sleep(15)
break

if data in ("running", "processing"):
raise self.ContinuePolling(f"Task still {data}")

if data in ("reported", "completed"):
report_url = (
self._url_key_name
+ "/apiv2/tasks/get/report/"
+ str(task_id)
+ "/litereport"
)

results = self.__single_poll(
report_url, polling=False
).json()

# the task was being processed
if (
"error" in results
and results["error"]
and results["error_value"]
== "Task is still being analyzed"
):
raise self.ContinuePolling("Task still processing")

logger.info(
f" Job: {self.job_id} ->"
f"Poll number #{attempt}/{len(timeout_attempts)} "
"fetched the results of the analysis."
" stopping polling.."
)
success = True

break

else:
raise AnalyzerRunException(
f"status {data} was unexpected. Check the code"
)

except self.ContinuePolling as e:
logger.info(
f"Job: {self.job_id} -> "
"Continuing the poll at attempt number: "
f"#{attempt}/{len(timeout_attempts)}. {e}. "
f"Sleeping for {curr_timeout} seconds."
)
if try_ != self.max_tries - 1: # avoiding useless last sleep
time.sleep(curr_timeout)

except self.ContinuePolling as e:
logger.info(
f"Job: {self.job_id} -> "
"Continuing the poll at attempt number: "
f"#{attempt}/{len(timeout_attempts)}. {e}. "
f"Sleeping for {curr_timeout} seconds."
)
if try_ != self.max_tries - 1: # avoiding useless last sleep
time.sleep(curr_timeout)
if not success:
raise AnalyzerRunException(f"{self.job_id} poll ended without results")

except SoftTimeLimitExceeded:
self._handle_exception(
"Soft Time Limit Exceeded: "
f"{self._url_key_name + '/analysis/' + str(task_id)}",
is_base_err=True,
)

if not success:
raise AnalyzerRunException(f"{self.job_id} poll ended without results")
return results

@classmethod
Expand Down