fix: 下载协程阻塞

This commit is contained in:
itouakirai
2025-03-06 08:38:15 +08:00
parent 8f284cadea
commit 65172a3d3d

View File

@@ -26,6 +26,7 @@ import (
"os/exec" "os/exec"
"strings" "strings"
"sync" "sync"
"time"
"github.com/grafov/m3u8" "github.com/grafov/m3u8"
"github.com/schollz/progressbar/v3" "github.com/schollz/progressbar/v3"
@@ -358,55 +359,102 @@ func ExtMvData(keyAndUrls string, savePath string) error {
} }
defer os.Remove(tempFile.Name()) defer os.Remove(tempFile.Name())
defer tempFile.Close() defer tempFile.Close()
// 依次下载每个链接并写入文件
bar := progressbar.DefaultBytes( // 创建上下文用于取消所有下载任务
-1, ctx, cancel := context.WithCancel(context.Background())
"Downloading...", defer cancel()
)
// 初始化进度条
bar := progressbar.DefaultBytes(-1, "Downloading...")
barWriter := io.MultiWriter(tempFile, bar) barWriter := io.MultiWriter(tempFile, bar)
// 预先创建所有管道
pipeReaders := make([]*io.PipeReader, len(urls)) pipeReaders := make([]*io.PipeReader, len(urls))
var wg sync.WaitGroup pipeWriters := make([]*io.PipeWriter, len(urls))
//最多同时5个下载请求 for i := range urls {
sem :=make(chan int, 5)
go func(pipeReaders []*io.PipeReader) {
for i, url := range urls {
pr, pw := io.Pipe() pr, pw := io.Pipe()
pipeReaders[i] = pr pipeReaders[i] = pr
sem <- 1 pipeWriters[i] = pw
}
// 控制并发数(使用空结构体节省内存)
sem := make(chan struct{}, 5)
var wg sync.WaitGroup
// 创建带超时的HTTP Client
client := &http.Client{
Timeout: 30 * time.Second,
}
// 启动下载任务
go func() {
for i, url := range urls {
select {
case <-ctx.Done():
return // 上下文已取消,直接返回
default:
sem <- struct{}{} // 获取信号量
wg.Add(1) wg.Add(1)
go func(i int, url string, pw *io.PipeWriter) { go func(i int, url string, pw *io.PipeWriter) {
//fmt.Printf("协程 %d 开始\n", i) defer func() {
defer wg.Done() <-sem // 释放信号量
resp, err := http.Get(url) wg.Done()
}()
// 创建带上下文的请求
req, err := http.NewRequestWithContext(ctx, "GET", url, nil)
if err != nil { if err != nil {
// 出错时,通过 CloseWithError 通知后续读取端
pw.CloseWithError(err) pw.CloseWithError(err)
fmt.Printf("下载 %s 失败: %v\n", url, err) fmt.Printf("创建请求失败: %v\n", err)
return
}
resp, err := client.Do(req)
if err != nil {
pw.CloseWithError(err)
fmt.Printf("下载失败: %v\n", err)
return return
} }
defer resp.Body.Close() defer resp.Body.Close()
// 将 HTTP 响应体通过 pipe 写出(实现流式传输)
_, err = io.Copy(pw, resp.Body) // 检查HTTP状态码
// 将可能的错误传递给 pipe if resp.StatusCode != http.StatusOK {
err := fmt.Errorf("非200状态码: %d", resp.StatusCode)
pw.CloseWithError(err) pw.CloseWithError(err)
}(i, url, pw) fmt.Printf("下载失败: %v\n", err)
return
} }
}(pipeReaders)
// 按顺序读取每个 pipe 的数据并写入文件 // 将响应体复制到管道
for i := range len(urls) { if _, err := io.Copy(pw, resp.Body); err != nil {
<-sem pw.CloseWithError(err)
//fmt.Printf("写入 %d 开始\n", i) } else {
pw.Close() // 正常关闭
}
}(i, url, pipeWriters[i])
}
}
}()
// 按顺序写入文件
for i := 0; i < len(urls); i++ {
if _, err := io.Copy(barWriter, pipeReaders[i]); err != nil { if _, err := io.Copy(barWriter, pipeReaders[i]); err != nil {
cancel() // 取消所有下载任务
fmt.Printf("写入第 %d 部分失败: %v\n", i+1, err) fmt.Printf("写入第 %d 部分失败: %v\n", i+1, err)
return err return err
} }
pipeReaders[i].Close() // 及时关闭 read pipeReaders[i].Close() // 关闭当前读取端
//fmt.Printf("写入 %d 成功\n", i)
} }
// 等待所有下载任务完成 // 等待所有下载任务完成
wg.Wait() wg.Wait()
tempFile.Close()
// 显式关闭文件defer会再次调用但重复关闭是安全的
if err := tempFile.Close(); err != nil {
fmt.Printf("关闭临时文件失败: %v\n", err)
return err
}
fmt.Println("\nDownloaded.") fmt.Println("\nDownloaded.")
cmd1 := exec.Command("mp4decrypt", "--key", key, tempFile.Name(), filepath.Base(savePath)) cmd1 := exec.Command("mp4decrypt", "--key", key, tempFile.Name(), filepath.Base(savePath))