持久函数:扇出扇入模式

Avatar of Sarah Drasner
Sarah Drasner

DigitalOcean 为您旅程的每个阶段提供云产品。 立即开始使用 200 美元的免费积分!

这篇文章是我与我优秀的同事 Maxime Rouiller 合作完成的。

持久函数?什么?如果您不熟悉 Durable,我建议您 从这篇文章开始,它涵盖了所有基本知识,以便您可以深入了解。 在这篇文章中,我们将深入探讨一个特定的用例,以便您可以看到持久函数模式的应用!

今天,让我们来谈谈**扇出,扇入**模式。 我们将通过从 GitHub 获取打开的问题数,然后存储我们得到的结果来进行说明。 这是包含所有代码的仓库,我们将在本文中逐步介绍。

查看仓库

关于扇出/扇入模式

我们在上一篇文章中简要提到了这种模式,所以让我们回顾一下。 当您需要并行执行多个函数,然后对这些结果执行其他任务时,您可能会用到这种模式。 可以想象,这种模式对很多项目都很有用,因为我们经常需要根据来自几个其他来源的数据来做一件事。

例如,假设您是一家外卖餐厅,有大量的订单涌入。 您可能会使用这种模式首先获取订单,然后使用该订单计算所有商品的价格、商品的可用性,并查看是否有任何商品有促销或优惠。 也许促销/优惠不会与您的价格存储在同一个地方,因为它们是由外部销售公司控制的。 您可能还需要了解您的送货队列情况以及根据其位置,您的员工中谁应该负责送货。

这需要大量的协调! 但是您需要汇总所有这些信息才能完成订单并进行处理。 当然,这是一个简化且人为的例子,但您可以看到并发处理一些事情有多么有用,以便它们可以随后被一个最终函数使用。

以下是抽象代码和可视化表示

查看 CodePen 上 Sarah Drasner (@sdras) 编写的 持久函数:模式 #2,扇出,扇入

const df = require('durable-functions')

module.exports = df(function*(ctx) {
  const tasks = []

  // items to process concurrently, added to an array
  const taskItems = yield ctx.df.callActivityAsync('fn1')
  taskItems.forEach(item => tasks.push(ctx.df.callActivityAsync('fn2', item))
  yield ctx.df.task.all(tasks)

  // send results to last function for processing
  yield ctx.df.callActivityAsync('fn3', tasks)
})

现在我们已经了解了为什么要使用这种模式,让我们深入研究一个简化的示例,解释它是如何工作的。

设置您的环境以使用持久函数

首先。 我们必须准备好开发环境才能使用持久函数。 让我们将其分解。

GitHub 个人访问令牌

要运行此示例,您需要在 GitHub 中创建一个个人访问令牌。 如果您转到帐户照片下方,打开下拉菜单,然后选择“设置”,然后在左侧边栏中选择“开发者设置”。 在下一个屏幕的同一侧边栏中,点击“个人访问令牌”选项。

然后会弹出一个提示,您可以点击生成新令牌按钮。 您应该为您的令牌指定一个对该项目有意义的名称。 比如“持久函数比墨西哥卷饼好”。 您知道,像这样一些标准的东西。

对于范围/权限选项,我建议选择“存储库”,然后允许点击“生成令牌”按钮并将令牌复制到剪贴板。 请记住,您永远不要提交您的令牌。(如果您这样做,它将被撤销。问我为什么我知道。)如果您需要有关创建令牌的更多信息,请参考 此处提供的说明

Functions CLI

首先,我们将安装最新版本的 Azure Functions CLI。 我们可以通过在终端中运行以下命令来实现

npm i -g azure-functions-core-tools@core --unsafe-perm true

不安全权限标志是否让您感到不安? 我也曾经感到不安。 实际上,它所做的是在包脚本运行时阻止 UID/GID 切换,这是必要的,因为包本身是围绕 .NET 的 JavaScript 包装器。 也可以在没有此类标志的情况下使用 Brew 进行安装,并且 此处提供了更多相关信息

可选:在 VS Code 中设置项目

完全没有必要,但我喜欢在 VS Code 中使用 Azure 函数,因为它具有出色的本地调试功能,这在无服务器函数中通常很麻烦。 如果您还没有安装它,您可以在此处进行安装

为 Azure 设置免费试用并创建存储帐户

要运行此示例,您需要 试用 Azure 的免费试用版。 您可以进入门户并在左上角登录。 您将创建一个新的 Blob 存储帐户,并检索密钥。 既然我们已经完成了所有这些准备工作,我们就可以开始了!

设置我们的持久函数

让我们看看我们设置的仓库。 我们将克隆或分叉它

git clone https://github.com/Azure-Samples/durablefunctions-apiscraping-nodejs.git 

以下是初始文件结构的样子。

file structure for the durable function repo

(此可视化图是 使用我的 CLI 工具创建的。)

local.settings.json 中,将 GitHubToken 更改为之前从 GitHub 获取的值,并对两个存储密钥执行相同的操作——粘贴之前设置的存储帐户中的密钥。

然后运行

func extensions install
npm i
func host start

现在我们正在本地运行!

理解协调程序

如您所见,我们在 FanOutFanInCrawler 目录中有多个文件夹。 列出的目录中的函数 GetAllRepositoriesForOrganizationGetAllOpenedIssuesSaveRepositories 是我们将要协调的函数。

以下是我们将要执行的操作

  • 协调程序将启动 GetAllRepositoriesForOrganization 函数,我们将在其中传入组织名称,该名称从 Orchestrator_HttpStart 函数的 getInput() 中检索
  • 由于这可能不止一个仓库,我们将首先创建一个空数组,然后遍历所有仓库并运行 GetOpenedIssues,并将它们推送到数组中。 我们在这里运行的所有内容都将并发执行,因为它不在迭代器中的 yield 中
  • 然后我们将等待所有任务执行完成,最后调用 SaveRepositories,它将所有结果存储在 Blob 存储中

由于其他函数相当标准,让我们花点时间深入研究一下协调程序。 如果我们查看协调程序目录内部,我们可以看到它具有一个相当传统的函数设置,具有 index.jsfunction.json 文件。

生成器

在我们深入研究编排器之前,让我们先对生成器进行一个非常简短的旁路介绍,因为如果没有它们,你就无法理解其余的代码。

生成器并不是编写此代码的唯一方法!它也可以通过其他异步 JavaScript 模式来实现。碰巧的是,这是一种非常简洁易读的编写方式,所以让我们快速了解一下。

function* generator(i) {
yield i++;
yield i++;
yield i++;
}

var gen = generator(1);

console.log(gen.next().value); // 1
console.log(gen.next().value); // 2
console.log(gen.next().value); // 3
console.log(gen.next()); // {value: undefined, done: true}

function*后面的初始小星号之后,您可以开始使用yield关键字。调用生成器函数不会完全执行整个函数;而是返回一个迭代器对象。next()方法将逐个遍历它们,我们会得到一个对象,该对象告诉我们valuedone——这将是一个布尔值,表示我们是否已完成遍历所有yield语句。您可以在上面的示例中看到,对于最后一个.next()调用,返回一个donetrue的对象,让我们知道我们已经遍历了所有值。

编排器代码

我们将从需要此功能才能正常工作的require语句开始。

const df = require('durable-functions')

module.exports = df(function*(context) {
  // our orchestrator code will go here
})

值得注意的是,那里的星号将创建一个迭代器函数。

首先,我们将从Orchestrator_HttpStart函数中获取组织名称,并使用GetAllRepositoriesForOrganization获取该组织的所有存储库。请注意,我们在存储库赋值中使用yield以使函数按顺序执行。

const df = require('durable-functions')

module.exports = df(function*(context) {
 var organizationName = context.df.getInput()
 var repositories = yield context.df.callActivityAsync(
   'GetAllRepositoriesForOrganization',
   organizationName
 )
})

然后,我们将创建一个名为output的空数组,从包含组织所有存储库的数组中创建一个for循环,并使用它将问题推送到数组中。请注意,我们在这里不使用yield,以便它们全部并发运行,而不是一个接一个地等待。

const df = require('durable-functions')

module.exports = df(function*(context) {
  var organizationName = context.df.getInput()
  var repositories = yield context.df.callActivityAsync(
   'GetAllRepositoriesForOrganization',
   organizationName
  )

 var output = []
 for (var i = 0; i < repositories.length; i++) {
   output.push(
     context.df.callActivityAsync('GetOpenedIssues', repositories[i])
   )
 }

})

最后,当所有这些执行完成后,我们将存储结果并将其传递给SaveRepositories函数,该函数会将它们保存到 Blob 存储中。然后,我们将返回实例的唯一 ID(context.instanceId)。

const df = require('durable-functions')

module.exports = df(function*(context) {
 var organizationName = context.df.getInput()
 var repositories = yield context.df.callActivityAsync(
   'GetAllRepositoriesForOrganization',
   organizationName
 )

 var output = []
 for (var i = 0; i < repositories.length; i++) {
   output.push(
     context.df.callActivityAsync('GetOpenedIssues', repositories[i])
   )
 }

 const results = yield context.df.Task.all(output)
 yield context.df.callActivityAsync('SaveRepositories', results)

 return context.instanceId
})

现在,我们拥有了使用此单个编排器管理所有函数所需的所有步骤!

部署

现在是有趣的部分。让我们部署!🚀

要部署组件,Azure 需要您安装 Azure CLI使用它登录

首先,您需要预配服务。查看提供的provision.ps1文件,以熟悉我们将要创建的资源。然后,您可以使用之前生成的 GitHub 令牌执行该文件,如下所示

.\provision.ps1 -githubToken <TOKEN> -resourceGroup <ResourceGroupName> -storageName <StorageAccountName> -functionName <FunctionName>

如果您不想安装 PowerShell,也可以获取provision.ps1中的命令并手动运行它。

就是这样!我们的 Durable Function 已经启动并运行。