class Fiber::Scheduler
这不是一个实际存在的类,而是为了说明 Scheduler
对象为了作为 Fiber.scheduler
的参数并处理非阻塞纤程而应该遵守的接口。另请参阅 Fiber
类文档中的“非阻塞纤程”部分,以了解一些概念的解释。
Scheduler 的行为和用法预计如下:
-
当非阻塞
Fiber
中的执行到达某些阻塞操作(如睡眠、等待进程或未就绪的 I/O)时,它会调用下面列出的调度器的钩子方法。 -
Scheduler
以某种方式注册当前纤程正在等待的内容,并使用Fiber.yield
将控制权移交给其他纤程(因此纤程会在等待其等待结束时暂停,并且同一线程中的其他纤程可以执行) -
在当前线程执行结束时,调度器的方法 scheduler_close 会被调用
-
调度器进入等待循环,检查所有阻塞的纤程(它在钩子调用中注册了这些纤程),并在等待的资源就绪时(例如,I/O 就绪或睡眠时间过去)恢复它们。
这样,对于每个单独的 Fiber 代码,都可以透明地实现并发执行。
Scheduler
的实现由 gems 提供,例如 Async。
钩子方法包括:
-
io_wait
,io_read
,io_write
,io_pread
,io_pwrite
, 以及io_select
, io_close -
(随着 Ruby 开发人员创建更多具有非阻塞调用的方法,列表会扩展)
除非另有说明,否则钩子实现是强制性的:如果它们没有实现,尝试调用钩子的方法将会失败。为了提供向后兼容性,未来的钩子将是可选的(如果由于为较旧的 Ruby 版本创建调度器而未实现,则需要此钩子的代码将不会失败,并且只会以阻塞方式运行)。
强烈建议调度器实现 fiber
方法,该方法由 Fiber.schedule
委托。
可以在 Ruby 代码的 test/fiber/scheduler.rb
中找到调度器的示例玩具实现。
公共实例方法
来源
VALUE rb_fiber_scheduler_address_resolve(VALUE scheduler, VALUE hostname) { VALUE arguments[] = { hostname }; return rb_check_funcall(scheduler, id_address_resolve, 1, arguments); }
由任何执行非反向 DNS 查找的方法调用。最著名的方法是 Addrinfo.getaddrinfo
,但还有许多其他方法。
该方法应该返回一个字符串数组,这些字符串对应于 hostname
解析到的 IP 地址,如果无法解析则返回 nil
。
所有可能的调用站点的相当详尽的列表
-
Addrinfo.marshal_load
来源
VALUE rb_fiber_scheduler_block(VALUE scheduler, VALUE blocker, VALUE timeout) { return rb_funcall(scheduler, id_block, 2, blocker, timeout); }
由诸如 Thread.join
之类的方法以及 Mutex 调用,以表示当前 Fiber
被阻塞,直到进一步通知(例如 unblock
)或直到 timeout
过期。
blocker
是我们正在等待的内容,仅供参考(用于调试和日志记录)。对其值没有任何保证。
预计返回布尔值,指定阻塞操作是否成功。
来源
VALUE rb_fiber_scheduler_blocking_operation_wait(VALUE scheduler, void* (*function)(void *), void *data, rb_unblock_function_t *unblock_function, void *data2, int flags, struct rb_fiber_scheduler_blocking_operation_state *state) { struct rb_blocking_operation_wait_arguments arguments = { .function = function, .data = data, .unblock_function = unblock_function, .data2 = data2, .flags = flags, .state = state }; VALUE proc = rb_proc_new(rb_fiber_scheduler_blocking_operation_wait_proc, (VALUE)&arguments); return rb_check_funcall(scheduler, id_blocking_operation_wait, 1, &proc); }
由 Ruby 的核心方法调用,以非阻塞方式运行阻塞操作。
建议的最小实现是:
def blocking_operation_wait(work) Thread.new(&work).join end
来源
VALUE rb_fiber_scheduler_close(VALUE scheduler) { RUBY_ASSERT(ruby_thread_has_gvl_p()); VALUE result; // The reason for calling `scheduler_close` before calling `close` is for // legacy schedulers which implement `close` and expect the user to call // it. Subsequently, that method would call `Fiber.set_scheduler(nil)` // which should call `scheduler_close`. If it were to call `close`, it // would create an infinite loop. result = rb_check_funcall(scheduler, id_scheduler_close, 0, NULL); if (!UNDEF_P(result)) return result; result = rb_check_funcall(scheduler, id_close, 0, NULL); if (!UNDEF_P(result)) return result; return Qnil; }
当当前线程退出时调用。为了允许所有等待的纤程完成其执行,调度器需要实现此方法。
建议的模式是在 close
方法中实现主事件循环。
Fiber.schedule
的实现。该方法应该立即在单独的非阻塞纤程中运行给定的代码块,并返回该 Fiber
。
建议的最小实现是:
def fiber(&block) fiber = Fiber.new(blocking: false, &block) fiber.resume fiber end
来源
VALUE rb_fiber_scheduler_io_pread(VALUE scheduler, VALUE io, rb_off_t from, VALUE buffer, size_t length, size_t offset) { VALUE arguments[] = { io, buffer, OFFT2NUM(from), SIZET2NUM(length), SIZET2NUM(offset) }; return rb_check_funcall(scheduler, id_io_pread, 5, arguments); }
由 IO#pread
或 IO::Buffer#pread
调用,以从 io
中从偏移量 from
处读取 length
个字节到指定的 buffer
(请参阅 IO::Buffer
)中的给定 offset
处。
此方法在语义上与 io_read
相同,但它允许指定要读取的偏移量,并且通常更适合在同一文件上进行异步 IO
。
该方法应被视为实验性的。
来源
VALUE rb_fiber_scheduler_io_pwrite(VALUE scheduler, VALUE io, rb_off_t from, VALUE buffer, size_t length, size_t offset) { VALUE arguments[] = { io, buffer, OFFT2NUM(from), SIZET2NUM(length), SIZET2NUM(offset) }; return rb_check_funcall(scheduler, id_io_pwrite, 5, arguments); }
由 IO#pwrite
或 IO::Buffer#pwrite
调用,以将 length
个字节从指定的 buffer
(请参阅 IO::Buffer
)中的给定 offset
处写入到 io
中的偏移量 from
处。
此方法在语义上与 io_write
相同,但它允许指定要写入的偏移量,并且通常更适合在同一文件上进行异步 IO
。
该方法应被视为实验性的。
来源
VALUE rb_fiber_scheduler_io_read(VALUE scheduler, VALUE io, VALUE buffer, size_t length, size_t offset) { VALUE arguments[] = { io, buffer, SIZET2NUM(length), SIZET2NUM(offset) }; return rb_check_funcall(scheduler, id_io_read, 4, arguments); }
由 IO#read
或 IO#Buffer.read 调用,以从 io
中读取 length
个字节到指定的 buffer
(请参阅 IO::Buffer
)中的给定 offset
处。
length
参数是“要读取的最小长度”。如果 IO
缓冲区的大小为 8KiB,但 length
为 1024
(1KiB),则可能会读取最多 8KiB,但至少会读取 1KiB。通常,只有在读取数据时发生错误时,才会读取少于 length
的数据。
指定 length
为 0 是有效的,表示尝试至少读取一次并返回任何可用数据。
建议的实现应尝试以非阻塞方式从 io
读取,如果 io
未就绪,则调用 io_wait
(这将把控制权交给其他纤程)。
请参阅 IO::Buffer
,了解可用于返回数据的接口。
预计返回读取的字节数,或者,如果发生错误,则返回 -errno
(对应于系统错误代码的负数)。
该方法应被视为实验性的。
来源
VALUE rb_fiber_scheduler_io_select(VALUE scheduler, VALUE readables, VALUE writables, VALUE exceptables, VALUE timeout) { VALUE arguments[] = { readables, writables, exceptables, timeout }; return rb_fiber_scheduler_io_selectv(scheduler, 4, arguments); }
由 IO.select
调用,以询问指定的描述符是否已准备好在指定的 timeout
内用于指定事件。
预计返回已准备好的 IO 的 3 元组 Array
。
来源
VALUE rb_fiber_scheduler_io_wait(VALUE scheduler, VALUE io, VALUE events, VALUE timeout) { return rb_funcall(scheduler, id_io_wait, 3, io, events, timeout); }
由 IO#wait
, IO#wait_readable
, IO#wait_writable
调用,以询问指定的描述符是否已准备好在指定的 timeout
内用于指定事件。
events
是 IO::READABLE
、IO::WRITABLE
和 IO::PRIORITY
的位掩码。
建议的实现应该注册哪个 Fiber
正在等待哪个资源,并立即调用 Fiber.yield
以将控制权传递给其他纤程。然后,在 close
方法中,调度器可能会将所有 I/O 资源分派给等待它的纤程。
预计返回立即准备好的事件的子集。
来源
VALUE rb_fiber_scheduler_io_write(VALUE scheduler, VALUE io, VALUE buffer, size_t length, size_t offset) { VALUE arguments[] = { io, buffer, SIZET2NUM(length), SIZET2NUM(offset) }; return rb_check_funcall(scheduler, id_io_write, 4, arguments); }
由 IO#write
或 IO::Buffer#write
调用,以从指定的 buffer
(请参阅 IO::Buffer
)中的给定 offset
处将 length
个字节写入到 io
。
length
参数是“要写入的最小长度”。如果 IO
缓冲区的大小为 8KiB,但指定的 length
为 1024 (1KiB),则最多写入 8KiB,但至少写入 1KiB。通常,只有在写入数据时发生错误时,才会写入少于 length
的数据。
指定 length
为 0 是有效的,表示尝试至少写入一次,尽可能多地写入数据。
建议的实现应尝试以非阻塞方式写入到 io
,如果 io
未就绪,则调用 io_wait
(这将把控制权交给其他纤程)。
请参阅 IO::Buffer
,了解可用于从缓冲区高效获取数据的接口。
预计返回写入的字节数,或者,如果发生错误,则返回 -errno
(对应于系统错误代码的负数)。
该方法应被视为实验性的。
来源
VALUE rb_fiber_scheduler_kernel_sleep(VALUE scheduler, VALUE timeout) { return rb_funcall(scheduler, id_kernel_sleep, 1, timeout); }
由 Kernel#sleep
和 Mutex#sleep 调用,并希望提供以非阻塞方式休眠的实现。实现可能会在“哪个纤程等待到哪个时刻”的列表中注册当前纤程,调用 Fiber.yield
以传递控制权,然后在 close
中恢复等待时间已过的纤程。
来源
VALUE rb_fiber_scheduler_process_wait(VALUE scheduler, rb_pid_t pid, int flags) { VALUE arguments[] = { PIDT2NUM(pid), RB_INT2NUM(flags) }; return rb_check_funcall(scheduler, id_process_wait, 2, arguments); }
由 Process::Status.wait
调用,以等待指定的进程。有关参数描述,请参阅该方法的描述。
建议的最小实现
Thread.new do Process::Status.wait(pid, flags) end.value
此钩子是可选的:如果当前调度器中不存在此钩子,Process::Status.wait
将作为阻塞方法运行。
预计返回 Process::Status
实例。
来源
VALUE rb_fiber_scheduler_timeout_after(VALUE scheduler, VALUE timeout, VALUE exception, VALUE message) { VALUE arguments[] = { timeout, exception, message }; return rb_check_funcall(scheduler, id_timeout_after, 3, arguments); }
由 Timeout.timeout
调用,以在给定的 duration
内执行给定的 block
。它也可以由调度器或用户代码直接调用。
如果可能,尝试将给定 block
的执行时间限制在给定 duration
内。当非阻塞操作导致 block
的执行时间超过指定的 duration
时,应通过引发使用给定 exception_arguments
构造的指定 exception_class
来中断该非阻塞操作。
通常认为全局执行超时是具有风险的。此实现只会中断非阻塞操作。这是有意为之的设计,因为非阻塞操作可能由于各种不可预测的原因而失败,因此应用程序应该已经能够健壮地处理这些情况,并且隐含地处理超时。
然而,由于这种设计,如果 block
没有调用任何非阻塞操作,则将无法中断它。如果您希望为超时提供可预测的点,请考虑添加 +sleep(0)+。
如果代码块执行成功,其结果将被返回。
异常通常会使用 Fiber#raise
抛出。
来源
VALUE rb_fiber_scheduler_unblock(VALUE scheduler, VALUE blocker, VALUE fiber) { RUBY_ASSERT(rb_obj_is_fiber(fiber)); return rb_funcall(scheduler, id_unblock, 2, blocker, fiber); }
被调用以唤醒之前使用 block
阻塞的 Fiber
(例如,Mutex#lock 调用 block
,而 Mutex#unlock 调用 unblock
)。调度器应使用 fiber
参数来了解哪个纤程被解除阻塞。
blocker
是等待的对象,但它仅供参考(用于调试和日志记录),并且不保证与 block
的 blocker
值相同。