Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
135 changes: 100 additions & 35 deletions app/common/download_task.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,14 +4,15 @@
import time
from pathlib import Path
from threading import Thread
from abc import ABC, abstractmethod

import curl_cffi
from PySide6.QtCore import QThread, Signal
from loguru import logger

from app.common.config import cfg
from app.common.methods import getProxy, getReadableSize, getLinkInfo, createSparseFile

from app.common.dto import ProgressInfo, SpeedInfo

class DownloadWorker:
"""只能出卖劳动力的最底层工作者"""
Expand All @@ -33,6 +34,49 @@ def __init__(self, fileSize):
self.done: bool = False



class AutoSpeedUpTrait(ABC):
@abstractmethod
def update(self, progress_info: ProgressInfo, task_count: int) -> int:
"""
参数:
progress_info: 进度信息
task_count: 运行中任务数量
返回值:
要额外创建的线程数
"""
raise NotImplementedError

class AutoSpeedUp(AutoSpeedUpTrait):
def __init__(self, accury: float = 0.8, threshold: float = 0.1):
self.accury = accury
self.threshold = threshold
self.formerTaskCount = 0
self.formerSpeed = 0
self.maxSpeedPerTask = 0

def update(self, progress_info: ProgressInfo, task_count: int) -> int:
additionTaskCount = task_count - self.formerTaskCount
self.formerTaskCount = task_count

info = progress_info.getSpeedInfo()
speed_delta = info.speed - self.formerSpeed

if info.speed / task_count > self.maxSpeedPerTask:
self.maxSpeedPerTask = info.speed / task_count

if (self.accury / info.time) ** 0.5 + self.threshold < speed_delta / additionTaskCount:
self.formerSpeed = info.speed
progress_info.reset()
logger.info(f"Task自动加速成功, 平均速度:{info.speed / task_count},最大速度:{self.maxSpeedPerTask},任务数量:{task_count}")
return 1
elif info.time > 60: # 超过 60 秒就重置
progress_info.reset()
return 0
else:
return 0


class DownloadTask(QThread):
"""Task Manager
self.fileSize == -1 表示自动获取; == 0 表示不能并行下载; else 表示正常"""
Expand All @@ -53,7 +97,7 @@ def __init__(
preTaskNum: int = 8,
filePath: str = None,
fileName: str = None,
autoSpeedUp: bool = False,
autoSpeedUpStrategy: AutoSpeedUpTrait | None = AutoSpeedUp(),
fileSize: int = -1,
parent=None,
):
Expand All @@ -65,7 +109,7 @@ def __init__(
self.fileName = fileName
self.filePath = filePath
self.preBlockNum = preTaskNum
self.autoSpeedUp = autoSpeedUp
self.autoSpeedUpStrategy = autoSpeedUpStrategy
self.fileSize = fileSize
self.ableToParallelDownload: bool

Expand Down Expand Up @@ -306,7 +350,7 @@ async def __handleWorker(self, worker: DownloadWorker, context: MutiThreadContex
worker.progress = worker.endPos

if (
not self.autoSpeedUp or context.running_task_count <= self.preBlockNum
not self.autoSpeedUpStrategy or context.running_task_count <= self.preBlockNum
): # 如果开启了自动提速且添加了额外线程,则重新分配工作线程由自动提速控制
self.__reassignWorker(context)
context.running_task_count += 1
Expand Down Expand Up @@ -361,15 +405,15 @@ async def __supervisor(self, context: MutiThreadContext):
self.progress += i.progress - i.startPos + 1
LastProgress = self.progress

if self.autoSpeedUp:
# 初始化变量
maxSpeedPerConnect = 1 # 防止除以 0
additionalTaskNum = (
context.running_task_count
) # 最初为计算每个线程的平均速度
formerAvgSpeed = 0.0 # 提速之前的平均速度
duringTime = 0 # 计算平均速度的时间间隔, 为 10 秒
_ = 0
# if self.autoSpeedUp:
# # 初始化变量
# maxSpeedPerConnect = 1 # 防止除以 0
# additionalTaskNum = (
# context.running_task_count
# ) # 最初为计算每个线程的平均速度
# formerAvgSpeed = 0.0 # 提速之前的平均速度
# duringTime = 0 # 计算平均速度的时间间隔, 为 10 秒
# _ = 0
ghdFile = open(f"{self.filePath}/{self.fileName}.ghd", "wb")
try:
while True: # 由外层cancel退出
Expand Down Expand Up @@ -407,28 +451,28 @@ async def __supervisor(self, context: MutiThreadContext):

# print(f"avgSpeed: {avgSpeed}, historySpeed: {self.historySpeed}")

if self.autoSpeedUp:
if duringTime < 10:
duringTime += 1
else:
duringTime = 0

speedPerConnect = avgSpeed / context.running_task_count
if speedPerConnect > maxSpeedPerConnect:
maxSpeedPerConnect = speedPerConnect
_ = (
0.9 * maxSpeedPerConnect * additionalTaskNum
) + formerAvgSpeed
if avgSpeed >= _:
formerAvgSpeed = avgSpeed
additionalTaskNum = 4
_ = (
0.85 * maxSpeedPerConnect * additionalTaskNum
) + formerAvgSpeed

if context.running_task_count < 253:
for i in range(4):
self.__reassignWorker(context) # 新增线程
# if self.autoSpeedUp:
# if duringTime < 10:
# duringTime += 1
# else:
# duringTime = 0

# speedPerConnect = avgSpeed / context.running_task_count
# if speedPerConnect > maxSpeedPerConnect:
# maxSpeedPerConnect = speedPerConnect
# _ = (
# 0.9 * maxSpeedPerConnect * additionalTaskNum
# ) + formerAvgSpeed
# if avgSpeed >= _:
# formerAvgSpeed = avgSpeed
# additionalTaskNum = 4
# _ = (
# 0.85 * maxSpeedPerConnect * additionalTaskNum
# ) + formerAvgSpeed

# if context.running_task_count < 253:
# for i in range(4):
# self.__reassignWorker(context) # 新增线程

await asyncio.sleep(1)

Expand All @@ -446,6 +490,22 @@ async def __supervisor(self, context: MutiThreadContext):
f"Failed to delete the history file, please delete it manually. Err: {e}"
)

async def autoSpeedUp(self, context: MutiThreadContext):
if self.autoSpeedUpStrategy is None:
return

progressInfo = ProgressInfo(self.progress)
while True:
await asyncio.sleep(1)
progressInfo.progress = self.progress
need_to_create_num = self.autoSpeedUpStrategy.update(
progressInfo, context.running_task_count
)
for i in range(need_to_create_num):
self.__reassignWorker(context)



async def __supervisorSingleThread(self):
LastProgress = 0
while True:
Expand Down Expand Up @@ -475,6 +535,7 @@ async def __main(self):

self.__loadWorkers(context)
supervisorTask = asyncio.create_task(self.__supervisor(context))
autoSpeedUpTask = asyncio.create_task(self.autoSpeedUp(context))
try:
async with context.taskgroup as tg:
for i in context.workers: # 启动 Worker
Expand All @@ -486,7 +547,10 @@ async def __main(self):

finally:
supervisorTask.cancel()
autoSpeedUpTask.cancel()
await supervisorTask
await autoSpeedUpTask


else:
# 单线程部分
Expand Down Expand Up @@ -531,3 +595,4 @@ def run(self):
finally:
self.loop.run_until_complete(self.loop.shutdown_asyncgens())
self.loop.close()

13 changes: 7 additions & 6 deletions app/common/dto.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,18 +9,19 @@ def __init__(self, speed = 0, elapsedTime = 1):
else:
raise ValueError("Time cannot be zero")

class SpeedRecorder:
__slot__ = ('progress', 'startTime')
class ProgressInfo:
__slot__ = ("progress", "startTime", "formerProgress")
def __init__(self, progress = 0):
self.formerProgress = progress
self.progress = progress
self.startTime = time()

def reset(self, progress):
self.progress = progress
def reset(self):
self.formerProgress = self.progress
self.startTime = time()

def update(self, progress) -> SpeedInfo:
def getSpeedInfo(self) -> SpeedInfo:
elapsedTime = time() - self.startTime
speed = (progress - self.progress) / elapsedTime
speed = (self.progress - self.formerProgress) / elapsedTime
return SpeedInfo(speed, elapsedTime)

10 changes: 8 additions & 2 deletions app/components/task_card.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@
from .custom_components import TaskProgressBar
from .custom_dialogs import DelDialog, CustomInputDialog
from ..common.config import cfg
from ..common.download_task import DownloadTask
from ..common.download_task import DownloadTask, AutoSpeedUp
from ..common.methods import getReadableSize, openFile, openFolder
from ..view.pop_up_window import FinishedPopUpWindow

Expand Down Expand Up @@ -126,7 +126,13 @@ def __launchTask(self):
def __instantiateTask(self, url: str, filePath: str, preBlockNum: int, headers: dict, fileSize: int = -1,
fileName: str = None):
autoSpeedUp = cfg.autoSpeedUp.value
self.task = DownloadTask(url, headers, preBlockNum, filePath, fileName, autoSpeedUp, fileSize)

if autoSpeedUp:
self.task = DownloadTask(url, headers, preBlockNum, filePath, fileName, AutoSpeedUp(), fileSize)
else:
self.task = DownloadTask(
url, headers, preBlockNum, filePath, fileName, None, fileSize
)

def updateTaskRecord(self, newStatus: str):
recordPath = "{}/Ghost Downloader 记录文件".format(cfg.appPath)
Expand Down