OpenTAP WorkQueue:异步提交与串行消费

背景

OpenTAP 里有一类典型需求:调用方不想被阻塞,但执行顺序又不能乱。比如结果分发、延后动作、设备地址探测。如果直接并发跑,很容易出现状态竞争;如果全同步,又会拖慢主流程。WorkQueue就是在这两者之间做平衡的基础组件。

框架分析

WorkQueue采用“多线程入队 + 单线程出队”模型。核心结构很直接:ConcurrentQueue<IInvokable> 存任务,SemaphoreSlim 提供可消费计数,threadCount + lock 保证每个队列最多一个 worker。这样入口可以快速返回,出口保持严格串行。

另外有两个关键选项:LongRunning 控制线程是否常驻,TimeAveraging 统计平均处理耗时。后者在 TestPlanRun 里用于估算结果监听器的队列延迟并触发节流。

实现过程

EnqueueWork() 的顺序是:countdown++ → 入队 → addSemaphore.Release() → 若无 worker 则启动 TapThread.Start(WorkerFunction)WorkerFunction() 被信号唤醒后循环出队执行,完成后 countdown--。普通模式下,队列空闲超过 Timeout(默认 5 秒)会退出线程;LongRunning 模式则继续驻留。

可复现命令:

1
2
cd /home/ops/clawd/repos/opentap
grep -R "new WorkQueue\|class WorkQueue" -n Engine | head -n 20

这条命令可以直接看到 WorkQueue 定义及在 TestPlanRunResultProxyVisaDeviceDiscovery 的主要落点。

注意事项

第一,WorkQueue只保证“单队列内串行”,不同队列之间仍会并发。第二,Wait() 依赖 countdown 轮询等待,适合低频同步点,不建议当高频屏障使用。第三,LongRunning 队列要显式 Dispose(),否则线程会长期保留。第四,代码里对 Semaphore 计数上限做了保护,极端洪峰下会短暂 Sleep,这是故意的背压设计。

小结

WorkQueue没有复杂调度算法,但它把边界切得很准:入口异步、出口串行、空闲可回收。对测试框架这类“正确性优先”的系统来说,这种朴素但稳定的实现往往比激进并发更可靠。

关键源码路径:

  • Engine/WorkQueue.cs
  • Engine/TestPlanRun.cs
  • Engine/ResultProxy.cs
  • Engine/VisaDeviceDiscovery.cs