这篇文章是我与我优秀的同事 Maxime Rouiller 合作完成的。
持久函数?什么?如果您不熟悉 Durable,我建议您 从这篇文章开始,它涵盖了所有基本知识,以便您可以深入了解。 在这篇文章中,我们将深入探讨一个特定的用例,以便您可以看到持久函数模式的应用!
今天,让我们来谈谈**扇出,扇入**模式。 我们将通过从 GitHub 获取打开的问题数,然后存储我们得到的结果来进行说明。 这是包含所有代码的仓库,我们将在本文中逐步介绍。
关于扇出/扇入模式
我们在上一篇文章中简要提到了这种模式,所以让我们回顾一下。 当您需要并行执行多个函数,然后对这些结果执行其他任务时,您可能会用到这种模式。 可以想象,这种模式对很多项目都很有用,因为我们经常需要根据来自几个其他来源的数据来做一件事。
例如,假设您是一家外卖餐厅,有大量的订单涌入。 您可能会使用这种模式首先获取订单,然后使用该订单计算所有商品的价格、商品的可用性,并查看是否有任何商品有促销或优惠。 也许促销/优惠不会与您的价格存储在同一个地方,因为它们是由外部销售公司控制的。 您可能还需要了解您的送货队列情况以及根据其位置,您的员工中谁应该负责送货。
这需要大量的协调! 但是您需要汇总所有这些信息才能完成订单并进行处理。 当然,这是一个简化且人为的例子,但您可以看到并发处理一些事情有多么有用,以便它们可以随后被一个最终函数使用。
以下是抽象代码和可视化表示
查看 CodePen 上 Sarah Drasner (@sdras) 编写的 持久函数:模式 #2,扇出,扇入。
const df = require('durable-functions')
module.exports = df(function*(ctx) {
const tasks = []
// items to process concurrently, added to an array
const taskItems = yield ctx.df.callActivityAsync('fn1')
taskItems.forEach(item => tasks.push(ctx.df.callActivityAsync('fn2', item))
yield ctx.df.task.all(tasks)
// send results to last function for processing
yield ctx.df.callActivityAsync('fn3', tasks)
})
现在我们已经了解了为什么要使用这种模式,让我们深入研究一个简化的示例,解释它是如何工作的。
设置您的环境以使用持久函数
首先。 我们必须准备好开发环境才能使用持久函数。 让我们将其分解。
GitHub 个人访问令牌
要运行此示例,您需要在 GitHub 中创建一个个人访问令牌。 如果您转到帐户照片下方,打开下拉菜单,然后选择“设置”,然后在左侧边栏中选择“开发者设置”。 在下一个屏幕的同一侧边栏中,点击“个人访问令牌”选项。

然后会弹出一个提示,您可以点击
对于范围/权限选项,我建议选择“存储库”,然后允许点击“生成令牌”按钮并将令牌复制到剪贴板。 请记住,您永远不要提交您的令牌。(如果您这样做,它将被撤销。问我为什么我知道。)如果您需要有关创建令牌的更多信息,请参考 此处提供的说明。
Functions CLI
首先,我们将安装最新版本的 Azure Functions CLI。 我们可以通过在终端中运行以下命令来实现
npm i -g azure-functions-core-tools@core --unsafe-perm true
不安全权限标志是否让您感到不安? 我也曾经感到不安。 实际上,它所做的是在包脚本运行时阻止 UID/GID 切换,这是必要的,因为包本身是围绕 .NET 的 JavaScript 包装器。 也可以在没有此类标志的情况下使用 Brew 进行安装,并且 此处提供了更多相关信息。
可选:在 VS Code 中设置项目
完全没有必要,但我喜欢在 VS Code 中使用 Azure 函数,因为它具有出色的本地调试功能,这在无服务器函数中通常很麻烦。 如果您还没有安装它,您可以在此处进行安装
为 Azure 设置免费试用并创建存储帐户
要运行此示例,您需要 试用 Azure 的免费试用版。 您可以进入门户并在左上角登录。 您将创建一个新的 Blob 存储帐户,并检索密钥。 既然我们已经完成了所有这些准备工作,我们就可以开始了!
设置我们的持久函数
让我们看看我们设置的仓库。 我们将克隆或分叉它
git clone https://github.com/Azure-Samples/durablefunctions-apiscraping-nodejs.git
以下是初始文件结构的样子。

(此可视化图是 使用我的 CLI 工具创建的。)
在 local.settings.json
中,将 GitHubToken
更改为之前从 GitHub 获取的值,并对两个存储密钥执行相同的操作——粘贴之前设置的存储帐户中的密钥。
然后运行
func extensions install
npm i
func host start
现在我们正在本地运行!
理解协调程序
如您所见,我们在 FanOutFanInCrawler
目录中有多个文件夹。 列出的目录中的函数 GetAllRepositoriesForOrganization
、GetAllOpenedIssues
和 SaveRepositories
是我们将要协调的函数。
以下是我们将要执行的操作
- 协调程序将启动
GetAllRepositoriesForOrganization
函数,我们将在其中传入组织名称,该名称从Orchestrator_HttpStart
函数的getInput()
中检索 - 由于这可能不止一个仓库,我们将首先创建一个空数组,然后遍历所有仓库并运行
GetOpenedIssues
,并将它们推送到数组中。 我们在这里运行的所有内容都将并发执行,因为它不在迭代器中的 yield 中 - 然后我们将等待所有任务执行完成,最后调用
SaveRepositories
,它将所有结果存储在 Blob 存储中
由于其他函数相当标准,让我们花点时间深入研究一下协调程序。 如果我们查看协调程序目录内部,我们可以看到它具有一个相当传统的函数设置,具有 index.js
和 function.json
文件。
生成器
在我们深入研究编排器之前,让我们先对生成器进行一个非常简短的旁路介绍,因为如果没有它们,你就无法理解其余的代码。
生成器并不是编写此代码的唯一方法!它也可以通过其他异步 JavaScript 模式来实现。碰巧的是,这是一种非常简洁易读的编写方式,所以让我们快速了解一下。
function* generator(i) {
yield i++;
yield i++;
yield i++;
}
var gen = generator(1);
console.log(gen.next().value); // 1
console.log(gen.next().value); // 2
console.log(gen.next().value); // 3
console.log(gen.next()); // {value: undefined, done: true}
在function*
后面的初始小星号之后,您可以开始使用yield
关键字。调用生成器函数不会完全执行整个函数;而是返回一个迭代器对象。next()
方法将逐个遍历它们,我们会得到一个对象,该对象告诉我们value
和done
——这将是一个布尔值,表示我们是否已完成遍历所有yield
语句。您可以在上面的示例中看到,对于最后一个.next()
调用,返回一个done
为true
的对象,让我们知道我们已经遍历了所有值。
编排器代码
我们将从需要此功能才能正常工作的require
语句开始。
const df = require('durable-functions')
module.exports = df(function*(context) {
// our orchestrator code will go here
})
值得注意的是,那里的星号将创建一个迭代器函数。
首先,我们将从Orchestrator_HttpStart
函数中获取组织名称,并使用GetAllRepositoriesForOrganization
获取该组织的所有存储库。请注意,我们在存储库赋值中使用yield
以使函数按顺序执行。
const df = require('durable-functions')
module.exports = df(function*(context) {
var organizationName = context.df.getInput()
var repositories = yield context.df.callActivityAsync(
'GetAllRepositoriesForOrganization',
organizationName
)
})
然后,我们将创建一个名为output
的空数组,从包含组织所有存储库的数组中创建一个for
循环,并使用它将问题推送到数组中。请注意,我们在这里不使用yield
,以便它们全部并发运行,而不是一个接一个地等待。
const df = require('durable-functions')
module.exports = df(function*(context) {
var organizationName = context.df.getInput()
var repositories = yield context.df.callActivityAsync(
'GetAllRepositoriesForOrganization',
organizationName
)
var output = []
for (var i = 0; i < repositories.length; i++) {
output.push(
context.df.callActivityAsync('GetOpenedIssues', repositories[i])
)
}
})
最后,当所有这些执行完成后,我们将存储结果并将其传递给SaveRepositories
函数,该函数会将它们保存到 Blob 存储中。然后,我们将返回实例的唯一 ID(context.instanceId
)。
const df = require('durable-functions')
module.exports = df(function*(context) {
var organizationName = context.df.getInput()
var repositories = yield context.df.callActivityAsync(
'GetAllRepositoriesForOrganization',
organizationName
)
var output = []
for (var i = 0; i < repositories.length; i++) {
output.push(
context.df.callActivityAsync('GetOpenedIssues', repositories[i])
)
}
const results = yield context.df.Task.all(output)
yield context.df.callActivityAsync('SaveRepositories', results)
return context.instanceId
})
现在,我们拥有了使用此单个编排器管理所有函数所需的所有步骤!
部署
现在是有趣的部分。让我们部署!🚀
要部署组件,Azure 需要您安装 Azure CLI并使用它登录。
首先,您需要预配服务。查看提供的provision.ps1
文件,以熟悉我们将要创建的资源。然后,您可以使用之前生成的 GitHub 令牌执行该文件,如下所示
.\provision.ps1 -githubToken <TOKEN> -resourceGroup <ResourceGroupName> -storageName <StorageAccountName> -functionName <FunctionName>
如果您不想安装 PowerShell,也可以获取provision.ps1
中的命令并手动运行它。
就是这样!我们的 Durable Function 已经启动并运行。