🧪 Skills
Batch Processing Patterns
批量处理与长时任务编排模式。涵盖队列管理、并发调度、中断恢复、熔断器、远程任务轮询、进度报告和反风控策略。适用于批量文件处理、AI API 调
v1.0.0
Description
name: batch-processing-patterns version: 1.0.0 description: 批量处理与长时任务编排模式。涵盖队列管理、并发调度、中断恢复、熔断器、远程任务轮询、进度报告和反风控策略。适用于批量文件处理、AI API 调用、爬虫和后台任务场景。
批量处理与长时任务指南
来自生产级桌面应用的实战经验,覆盖批量文件处理、远程 API 轮询、并发控制和错误恢复。
适用场景
- 批量文件处理(转码、压缩、水印)
- 远程 AI 任务轮询(视频生成、语音合成)
- 爬虫/批量 HTTP 请求
- 后台队列任务
1. 批处理架构
任务队列
├── 并发调度器(动态调整并发数)
│ ├── Worker 1 → processItem()
│ ├── Worker 2 → processItem()
│ └── Worker N → processItem()
├── 中止控制器(shouldStop + 子进程清理)
├── 熔断器(连续失败 N 次暂停)
├── 跳过检查(断点续传 / 前置过滤)
└── 进度报告(per-item + overall)
核心原则
- 逐项处理,逐项报告 — 不等全部完成
- 中断即停 — 每个 item 之间检查 abort 信号
- 失败不中断 — 单项失败标记
failed,继续处理其他 - 熔断保护 — 连续失败超阈值暂停整个队列
2. 并发调度
自适应并发池
根据每个 item 的处理耗时动态调整并发数:
class AdaptiveScheduler {
private concurrency: number;
private running = 0;
private queue: (() => void)[] = [];
constructor(
private min: number,
private max: number,
private slowThresholdMs: number
) {
this.concurrency = Math.ceil((min + max) / 2);
}
async run<T>(fn: () => Promise<T>): Promise<T> {
if (this.running >= this.concurrency) {
await new Promise<void>((resolve) => this.queue.push(resolve));
}
this.running++;
const start = Date.now();
try {
return await fn();
} finally {
const elapsed = Date.now() - start;
this.running--;
// 自适应调整
if (elapsed > this.slowThresholdMs && this.concurrency > this.min) {
this.concurrency--;
} else if (elapsed < this.slowThresholdMs / 2 && this.concurrency < this.max) {
this.concurrency++;
}
if (this.queue.length > 0) {
this.queue.shift()!();
}
}
}
}
预设配置
| 场景 | 初始 | 最小 | 最大 | 慢阈值 |
|---|---|---|---|---|
| CPU 密集(FFmpeg 转码) | 4 | 1 | CPU 核数 | 3s |
| API 调用(AI 服务) | 3 | 1 | 5 | 8s |
| 文件 I/O | 2 | 1 | 3 | 30s |
| 串行(必须顺序) | 1 | 1 | 1 | - |
简易并发池(不需要自适应时)
async function runPool<T>(
items: T[],
fn: (item: T) => Promise<void>,
concurrency: number,
signal?: AbortSignal
): Promise<{ completed: number; failed: number }> {
let completed = 0, failed = 0;
const running = new Set<Promise<void>>();
for (const item of items) {
if (signal?.aborted) break;
const p = fn(item)
.then(() => { completed++; })
.catch(() => { failed++; });
running.add(p.then(() => { running.delete(p); }));
if (running.size >= concurrency) {
await Promise.race(running);
}
}
await Promise.all(running);
return { completed, failed };
}
3. 中断与恢复
AbortController 模式
class BatchAbortController {
private _aborted = false;
private callbacks: (() => void)[] = [];
get aborted() { return this._aborted; }
abort() {
if (this._aborted) return; // 幂等
this._aborted = true;
this.callbacks.forEach((cb) => cb());
}
onAbort(cb: () => void) {
if (this._aborted) { cb(); return; }
this.callbacks.push(cb);
}
reset() {
this._aborted = false;
this.callbacks = [];
}
}
断点续传(shouldSkip)
const completedSet = new Set(loadCompletedFromDisk());
function shouldSkip(item: FileItem): string | null {
if (completedSet.has(item.path)) return '已完成(断点续传)';
if (item.size <= targetSize) return '已满足目标条件';
return null; // 正常处理
}
// 在批处理循环中
for (const item of items) {
if (abortController.aborted) break;
const skipReason = shouldSkip(item);
if (skipReason) {
onItemSkip(item, skipReason);
continue;
}
try {
await processItem(item);
completedSet.add(item.path);
saveCompletedToDisk(completedSet); // 持久化进度
onItemComplete(item);
} catch (err) {
onItemError(item, err);
}
}
4. 熔断器
连续失败过多时自动暂停,避免无意义的重试浪费资源。
class CircuitBreaker {
private consecutiveFailures = 0;
constructor(
private maxFailures: number = 5,
private onTrip?: (failures: number) => void
) {}
recordSuccess() {
this.consecutiveFailures = 0;
}
recordFailure(): boolean {
this.consecutiveFailures++;
if (this.consecutiveFailures >= this.maxFailures) {
this.onTrip?.(this.consecutiveFailures);
return true; // tripped
}
return false;
}
get isTripped() {
return this.consecutiveFailures >= this.maxFailures;
}
reset() {
this.consecutiveFailures = 0;
}
}
5. 远程任务轮询
递归 setTimeout 模式(推荐)
function startPolling(taskId: string, interval = 5000) {
const poll = async () => {
try {
const result = await queryTaskStatus(taskId);
if (result.status === 'completed') {
onComplete(result);
return; // 停止轮询
}
if (result.status === 'failed') {
onFailed(result);
return;
}
// 超时检查
if (Date.now() - startTime > MAX_POLL_TIME) {
onTimeout(taskId);
return;
}
setTimeout(poll, interval); // 继续
} catch (err) {
if (isCriticalError(err)) return; // 停止
setTimeout(poll, interval); // 瞬态错误继续
}
};
poll();
}
关键要点
- 用
setTimeout而非setInterval— 确保前一次完成后再调度下一次 - 超时保护 — 总轮询时间有上限,避免永久轮询
- 错误分类 — 瞬态错误继续轮询,致命错误立即停止
- 去重 — 用 Set 防止同一任务重复轮询
批量轮询(多任务顺序轮询)
// 多任务并发轮询可能触发限流,改为顺序轮询
const pollAll = async () => {
const activeTasks = getActiveTasks();
for (const task of activeTasks) {
if (aborted) return;
await pollOne(task.id);
// 任务间延迟,尊重 QPS
if (activeTasks.length > 1) {
await sleep(2000);
}
}
if (getActiveTasks().length > 0) {
setTimeout(pollAll, POLL_INTERVAL);
}
};
6. 错误分类与处理
| 错误类型 | 行为 | 示例 |
|---|---|---|
| 瞬态网络错误 | 重试 | timeout, ECONNRESET |
| 401 Unauthorized | 停止 | API Key 无效 |
| 403 / 余额不足 | 停止 | 账户问题 |
| 429 Too Many Requests | 退避重试 | 限流 |
| 500+ Server Error | 有限重试(3 次) | 服务端异常 |
| 文件不存在 | 跳过此项 | 输入文件被删 |
| 磁盘空间不足 | 停止全部 | ENOSPC |
指数退避重试
async function withRetry<T>(
fn: () => Promise<T>,
maxRetries = 3,
baseDelay = 2000
): Promise<T> {
for (let attempt = 0; attempt <= maxRetries; attempt++) {
try {
return await fn();
} catch (err) {
if (attempt === maxRetries) throw err;
if (isFatalError(err)) throw err; // 不重试
const delay = baseDelay * Math.pow(2, attempt)
+ Math.random() * 1000; // 抖动
await sleep(delay);
}
}
throw new Error('unreachable');
}
7. 进度报告
双层进度
interface BatchProgress {
// 整体进度
current: number; // 已处理项数
total: number; // 总项数
percentage: number; // 0-100
// 当前项进度(可选)
currentItem?: {
name: string;
itemProgress: number; // 0-100
};
}
// 报告整体进度
function reportProgress(done: number, total: number) {
send({ current: done, total, percentage: Math.round(done / total * 100) });
}
进度节流
// GUI 场景必须节流,否则高频更新会卡 UI
const throttledProgress = throttle(reportProgress, 100); // 10fps
8. 反风控策略(批量 HTTP 请求)
三层防护
请求节流(同域名限速)→ 风控检测(响应分析)→ 指数退避重试
请求节流
const lastRequest = new Map<string, number>();
async function throttleByDomain(domain: string) {
const last = lastRequest.get(domain) || 0;
const minInterval = 1500 + Math.random() * 1500; // 1.5-3s 随机
const wait = minInterval - (Date.now() - last);
if (wait > 0) await sleep(wait);
lastRequest.set(domain, Date.now());
}
风控检测
即使 HTTP 200 也要检查响应内容:
- 验证码页面关键词(captcha, 滑块验证)
- 空壳页面(body < 500 字节)
- 重定向到登录页
UA 轮换
const USER_AGENTS = [
'Mozilla/5.0 (Macintosh; ...) Chrome/...',
'Mozilla/5.0 (Windows NT ...) Chrome/...',
'Mozilla/5.0 (iPhone; ...) Safari/...',
];
function randomUA(): string {
return USER_AGENTS[Math.floor(Math.random() * USER_AGENTS.length)];
}
9. Checklist
新增批处理功能
- 有 AbortController 中止机制
- 有并发控制(不超过 CPU 核数)
- 单项失败不中断整体
- 有熔断器(连续失败 5 次暂停)
- 进度通过节流后的回调报告
- 错误分类处理(瞬态 vs 致命)
- 临时文件在 finally 中清理
远程任务轮询
- 用 setTimeout 而非 setInterval
- 有超时上限
- 有去重机制
- 401/403 立即停止轮询
- 429 指数退避
批量 HTTP 请求
- 同域名请求间隔 ≥ 1.5s
- 检测风控响应(captcha/空页面)
- 退避重试(2s → 4s → 8s)
来源
提炼自生产级 Electron 桌面应用,覆盖 6+ 个批处理模块和 3 个远程轮询场景的实战经验。
Reviews (0)
Sign in to write a review.
No reviews yet. Be the first to review!
Comments (0)
No comments yet. Be the first to share your thoughts!