From 65172a3d3de621c4ceeb2ac942ed66842a98fd6e Mon Sep 17 00:00:00 2001 From: itouakirai Date: Thu, 6 Mar 2025 08:38:15 +0800 Subject: [PATCH] =?UTF-8?q?fix:=20=E4=B8=8B=E8=BD=BD=E5=8D=8F=E7=A8=8B?= =?UTF-8?q?=E9=98=BB=E5=A1=9E?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- utils/runv3/runv3.go | 122 ++++++++++++++++++++++++++++++------------- 1 file changed, 85 insertions(+), 37 deletions(-) diff --git a/utils/runv3/runv3.go b/utils/runv3/runv3.go index 66b21e1..6a8010b 100644 --- a/utils/runv3/runv3.go +++ b/utils/runv3/runv3.go @@ -26,6 +26,7 @@ import ( "os/exec" "strings" "sync" + "time" "github.com/grafov/m3u8" "github.com/schollz/progressbar/v3" @@ -358,55 +359,102 @@ func ExtMvData(keyAndUrls string, savePath string) error { } defer os.Remove(tempFile.Name()) defer tempFile.Close() - // 依次下载每个链接并写入文件 - bar := progressbar.DefaultBytes( - -1, - "Downloading...", - ) + + // 创建上下文用于取消所有下载任务 + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + + // 初始化进度条 + bar := progressbar.DefaultBytes(-1, "Downloading...") barWriter := io.MultiWriter(tempFile, bar) + + // 预先创建所有管道 pipeReaders := make([]*io.PipeReader, len(urls)) - var wg sync.WaitGroup - //最多同时5个下载请求 - sem :=make(chan int, 5) - go func(pipeReaders []*io.PipeReader) { - for i, url := range urls { + pipeWriters := make([]*io.PipeWriter, len(urls)) + for i := range urls { pr, pw := io.Pipe() - pipeReaders[i] = pr - sem <- 1 - wg.Add(1) - go func(i int, url string, pw *io.PipeWriter) { - //fmt.Printf("协程 %d 开始\n", i) - defer wg.Done() - resp, err := http.Get(url) - if err != nil { - // 出错时,通过 CloseWithError 通知后续读取端 - pw.CloseWithError(err) - fmt.Printf("下载 %s 失败: %v\n", url, err) - return - } - defer resp.Body.Close() - // 将 HTTP 响应体通过 pipe 写出(实现流式传输) - _, err = io.Copy(pw, resp.Body) - // 将可能的错误传递给 pipe - pw.CloseWithError(err) - }(i, url, pw) + pipeReaders[i] = pr + pipeWriters[i] = pw + } + + // 控制并发数(使用空结构体节省内存) + sem := make(chan struct{}, 5) + var wg sync.WaitGroup + + // 创建带超时的HTTP Client + client := &http.Client{ + Timeout: 30 * time.Second, + } + + // 启动下载任务 + go func() { + for i, url := range urls { + select { + case <-ctx.Done(): + return // 上下文已取消,直接返回 + default: + sem <- struct{}{} // 获取信号量 + wg.Add(1) + + go func(i int, url string, pw *io.PipeWriter) { + defer func() { + <-sem // 释放信号量 + wg.Done() + }() + + // 创建带上下文的请求 + req, err := http.NewRequestWithContext(ctx, "GET", url, nil) + if err != nil { + pw.CloseWithError(err) + fmt.Printf("创建请求失败: %v\n", err) + return + } + + resp, err := client.Do(req) + if err != nil { + pw.CloseWithError(err) + fmt.Printf("下载失败: %v\n", err) + return + } + defer resp.Body.Close() + + // 检查HTTP状态码 + if resp.StatusCode != http.StatusOK { + err := fmt.Errorf("非200状态码: %d", resp.StatusCode) + pw.CloseWithError(err) + fmt.Printf("下载失败: %v\n", err) + return + } + + // 将响应体复制到管道 + if _, err := io.Copy(pw, resp.Body); err != nil { + pw.CloseWithError(err) + } else { + pw.Close() // 正常关闭 + } + }(i, url, pipeWriters[i]) + } } - }(pipeReaders) - // 按顺序读取每个 pipe 的数据并写入文件 - for i := range len(urls) { - <-sem - //fmt.Printf("写入 %d 开始\n", i) + }() + + // 按顺序写入文件 + for i := 0; i < len(urls); i++ { if _, err := io.Copy(barWriter, pipeReaders[i]); err != nil { + cancel() // 取消所有下载任务 fmt.Printf("写入第 %d 部分失败: %v\n", i+1, err) return err } - pipeReaders[i].Close() // 及时关闭 read - //fmt.Printf("写入 %d 成功\n", i) + pipeReaders[i].Close() // 关闭当前读取端 } // 等待所有下载任务完成 wg.Wait() - tempFile.Close() + + // 显式关闭文件(defer会再次调用,但重复关闭是安全的) + if err := tempFile.Close(); err != nil { + fmt.Printf("关闭临时文件失败: %v\n", err) + return err + } fmt.Println("\nDownloaded.") cmd1 := exec.Command("mp4decrypt", "--key", key, tempFile.Name(), filepath.Base(savePath))