分享免费的编程资源和教程

网站首页 > 技术教程 正文

Elixir实战:10 超越 GenServer (1)任务(Task)

goqiw 2025-01-20 12:55:33 技术教程 56 ℃ 0 评论

本章涵盖

  • 任务
  • 代理人
  • ETS 表格

第 8 章和第 9 章介绍了工作进程和监督进程之间的区别。工作进程是提供服务某部分的进程,而监督进程则将工作进程组织成树形结构。这使您能够按所需顺序启动和停止进程,并在关键进程失败时重新启动它们。

如第 9.1.6 节所述,所有直接由监督者启动的进程应为 OTP 合规进程。使用普通 spawn 和 spawn_link 启动的进程不符合 OTP 标准,因此您应避免在生产环境中运行此类进程。模块如 Supervisor 、 GenServer 和 Registry 允许您启动可以放入监督树中的 OTP 合规进程。

在本章中,您将了解两个额外的模块,它们也允许您运行符合 OTP 的工作者: Task 和 Agent 。当您需要运行一次性任务时,任务非常有用,而代理可以用于管理状态并提供对其的并发访问。最后,我们将讨论一个相关的特性,称为 ETS 表,在某些条件下,它可以作为 GenServer 和 Agent 的更高效替代方案。还有很多新内容要覆盖,所以让我们先讨论任务。

10.1 任务

Task 模块可以用来并发运行一个作业——一个接受某些输入、执行某些计算然后停止的过程。从这个意义上说,任务驱动的过程与服务器过程的流动不同。虽然 GenServer 过程充当一个长时间运行的服务器,但 Task 驱动的过程会立即开始工作,不会处理请求,并在工作完成时停止。

Task 模块可以以两种不同的方式使用,具体取决于任务过程是否需要将结果发送回启动它的过程。前一种情况也称为等待任务,因为启动过程会等待任务发送结果回来。让我们先讨论这个选项。

10.1.1 待处理任务

一个等待的任务是一个执行某个功能的过程,将功能结果发送回启动过程,然后终止。让我们看一个基本的例子。

假设您想启动一个并发的、可能长时间运行的作业并获取其结果。您可以使用以下函数来模拟一个长时间运行的作业:

iex(1)> long_job =
          fn ->
            Process.sleep(2000)
            :some_result
          end

这个 lambda 在被调用时,会睡眠 2 秒钟,然后返回 :some_result 。

要并发运行此 lambda,您可以使用 Task.async/1 :

iex(2)> task = Task.async(long_job)

Task.async/1 函数接受一个零参数的 lambda,生成一个独立的进程,并在生成的进程中调用该 lambda。lambda 的返回值将作为消息发送回启动进程。

因为计算在一个单独的进程中运行, Task.async/1 会立即返回,即使 lambda 本身需要很长时间才能完成。这意味着启动进程不会被阻塞,可以与任务进程并发地执行一些额外的工作。

Task.async/1 的返回值是一个描述正在运行的任务的结构体。这个结构体可以传递给 Task.await/1 以等待任务的结果:

iex(3)> Task.await(task)
:some_result

函数 Task.await/1 等待来自任务处理的响应消息。该消息将包含 lambda 的结果。当消息到达时, Task.await/1 返回 lambda 的结果。如果消息在 5 秒内未到达, Task.await/1 将引发异常。您可以将不同的超时作为第二个参数提供给 Task.await/2 。

等待的任务在您需要运行几个相互独立的一次性计算并等待所有结果时非常有用。为了说明这一点,我们将重用第 5.2.2 节中的示例。在该示例中,您需要执行多个独立的查询并收集所有结果。由于查询是相互独立的,您可以通过在单独的进程中运行每个查询并将结果作为消息发送到启动进程来提高总执行时间。然后,启动进程需要等待所有结果。

在第五章中,您从头开始实现了这一点,使用了 spawn 、 send 和 receive 。在这里,您将依赖 Task.async/1 和 Task.await/1 。

首先,定义一个辅助 lambda,用于模拟长时间运行的查询执行:

iex(1)> run_query =
          fn query_def ->
            Process.sleep(2000)
            "#{query_def} result"
          end

现在,您可以开始五个查询,每个查询在一个单独的任务中进行:

iex(2)> queries = 1..5
 
iex(3)> tasks =
          Enum.map(
            queries,
            &Task.async(fn -> run_query.("query #{&1}") end)
          )

在这里,您创建五个查询,然后在单独的任务中启动每个查询的执行。 tasks 变量中的结果是一个包含五个 %Task{} 结构的列表,每个结构描述一个执行查询的任务。

要等待所有结果,您将 tasks 变量中的每个任务传递给 Task.await/1 函数:

iex(4)> Enum.map(tasks, &Task.await/1)
["query 1 result", "query 2 result", "query 3 result", "query 4 result",
 "query 5 result"]

使用管道操作符,您可以以稍微简短的方式编写此代码:

iex(5)> 1..5
        |> Enum.map(&Task.async(fn -> run_query.("query #{&1}") end))
        |> Enum.map(&Task.await/1)
 
["query 1 result", "query 2 result", "query 3 result", "query 4 result",
 "query 5 result"]      ?

? 2 秒后返回

所有结果在 2 秒内收集的事实证明每个任务都在一个单独的进程中运行。

此代码按照任务启动的顺序等待任务。因此,结果的顺序是确定的。结果列表的第一个元素是查询 1 的结果,第二个元素是查询 2 的结果,依此类推。

需要注意的是, Task.async/1 将新任务与启动进程链接在一起。因此,如果任何任务进程崩溃,启动进程也会崩溃(除非它捕获退出)。启动进程的崩溃将反过来导致由同一进程启动的所有其他任务崩溃。换句话说,使用 Task.async/1 启动多个任务具有全有或全无的语义。单个任务的崩溃会导致所有其他任务以及启动进程一起崩溃。

如果您想明确处理单个任务的失败,您需要在启动进程中捕获退出并处理相应的退出消息。 Task 模块中有一些可用的函数可以帮助您,最显著的是 Task.async_stream/3 。您可以参考官方文档 https://hexdocs.pm/elixir/Task.xhtml 获取更多详细信息。与此同时,让我们看看当启动进程不需要等待任务结果时,您如何处理任务。

10.1.2 非等待任务

如果您不想将结果消息发送回启动进程,可以使用 Task.start_link/1 。这个函数可以被视为一个符合 OTP 的包装器,围绕普通的 spawn_link 。该函数启动一个单独的进程并将其链接到调用者。然后,提供的 lambda 在启动的进程中执行。一旦 lambda 完成,进程将以原因 :normal 终止。与 Task.async/1 不同, Task.start_link/1 不会向启动进程发送任何消息。以下是一个基本示例:

iex(1)> Task.start_link(fn ->
          Process.sleep(1000)
          IO.puts("Hello from task")
        end)
 
{:ok, #PID<0.89.0>}   ?
 
Hello from task!      ?

? Task.start_link/1 的结果

? 打印延迟 1 秒

让我们看一个更具体的例子。假设您想收集一些关于系统的指标并定期报告。这是一个非响应式作业的例子。您在这里并不需要一个 GenServer ,因为您不需要处理来自其他客户端进程的请求。相反,您想要一个进程,它会休眠一段时间,然后收集相关指标并进行报告。

让我们开始在你的待办事项系统中实现这个。首先,你将实现一个顺序循环,定期收集指标并将其打印到屏幕上。

清单 10.1 报告系统指标 (todo_metrics/lib/todo/metrics.ex)

defmodule Todo.Metrics do
  ...
 
  defp loop() do
    Process.sleep(:timer.seconds(10))
    IO.inspect(collect_metrics())
    loop()
  end
 
  defp collect_metrics() do
    [
      memory_usage: :erlang.memory(:total),
      process_count: :erlang.system_info(:process_count)
    ]
  end
end

在现实生活中,您可能希望收集更多数据并将其发送到外部服务,但这个例子保持简单。

您想将此循环作为系统的一部分运行。为此,您需要启动一个任务。

清单 10.2 作为任务的指标报告器 (todo_metrics/lib/todo/metrics.ex)

defmodule Todo.Metrics do
  use Task
 
  def start_link(_arg), do: Task.start_link(&loop/0)
 
  ...
end

首先,您指定 use Task ,这将把 child_spec/1 函数注入到 Todo.Metrics 模块中。与 GenServer 一样,注入的规范将调用 start_link/1 ,因此您需要定义 start_link/1 ,即使您不使用该参数。 start_link/1 的实现简单地调用 Task.start_link/1 来启动一个任务处理过程,其中循环正在运行。

通过这两行简单的代码, Todo.Metrics 模块已准备好注入到监督树中。

清单 10.3 启动一个监督指标任务 (todo_metrics/lib/todo/system.ex)

defmodule Todo.System do
  def start_link do
    Supervisor.start_link(
      [
        Todo.Metrics,
        ...
      ],
      strategy: :one_for_one
    )
  end
end

这就是 Task.start_link/1 的主要目的——它允许您启动一个符合 OTP 的进程,您可以安全地将其作为某个监视器的子进程运行。

试试看:

$ iex -S mix
 
iex(1)> Todo.System.start_link()
 
[memory_usage: 48110864, process_count: 74]   ?
[memory_usage: 48505592, process_count: 74]   ?

? 打印在 10 秒后

? 打印在 20 秒后

这是一种在您的系统中实现周期性作业的简单方法,无需运行多个操作系统进程和使用外部调度程序,例如 cron 。

在更复杂的场景中,值得将调度与作业逻辑分开。这个想法是使用一个进程进行周期性调度,然后在一个单独的临时进程中启动每个作业实例。这种方法提高了容错性,因为作业进程的崩溃不会干扰调度进程。你可以尝试将这种方法作为练习来实现,但在生产环境中,最好依赖经过实战检验的第三方库,例如 Quantum (https://github.com/quantum-elixir/quantum-core)。

10.1.3 监督动态任务

在许多情况下,您会希望动态地启动非预期任务。一个常见的例子是,当您需要在处理网络请求时与远程服务(例如支付网关)进行通信。

一种简单的方法是同步进行这种通信,同时处理请求。然而,这种方法可能导致用户体验不佳。如果存在一些间歇性的网络问题,与远程服务的通信可能会很慢,或者可能完全失败。

一种更好的方法是从一个单独的任务进程异步执行此通信。您接受传入请求,启动一个与远程服务通信的任务,并立即回应请求已被接受。一旦任务完成,您会发出关于结果的通知,可能通过 WebSocket 或电子邮件。这提高了系统的响应能力,并增强了系统对各种网络问题的弹性。您可以在更长的时间内重试失败的通信,而无需阻塞请求处理程序——甚至可以保持用户连接。

这是一个动态启动的独立任务的示例。该任务根据需要启动,其生命周期必须与启动它的进程(处理传入请求的进程)的生命周期解耦。

在这种情况下,最好在专用的监督者下运行任务。您可以为此使用 DynamicSupervisor ,但 Elixir 包含一个名为 Task.Supervisor 的任务特定包装器(https://hexdocs.pm/elixir/Task.Supervisor.xhtml)。

要运行动态监督的任务,请启动任务监督器:

iex(1)> Task.Supervisor.start_link(name: MyTaskSupervisor)

现在,您可以使用 Task.Supervisor.start_child/2 在该主管下开始一个任务:

iex(2)> Task.Supervisor.start_child(
          MyTaskSupervisor,
          fn ->
            IO.puts("Task started")
            Process.sleep(2000)
            IO.puts("Task stopping")
          end
        )
 
{:ok, #PID<0.118.0>}   ?
 
Task started           ?
Task stopping          ?

? start_child 的结果

? 立即打印

? 2 秒后打印

理解逻辑启动过程和实际启动过程之间的区别是很重要的。外壳进程是启动任务创建的进程。然而,任务实际上是作为任务监督者的子进程启动的。由于这种进程结构,逻辑启动器( iex 外壳进程)和任务的生命周期是分开的。一个进程的崩溃不会影响另一个进程。

这结束了我们对任务的简要介绍。我们没有涵盖所有的细微差别,因此我建议您更详细地研究官方模块文档,网址为 https://hexdocs.pm/elixir/Task.xhtml。接下来,我们将看看代理。

Tags:

本文暂时没有评论,来添加一个吧(●'◡'●)

欢迎 发表评论:

最近发表
标签列表