vapor-docs/docs/advanced/queues.zh.md

451 lines
14 KiB
Markdown
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

# 队列
Vapor Queues ([vapor/queues](https://github.com/vapor/queues)) 是一个纯 Swift 队列系统,它允许你将任务责任转移给一个辅助 worker。
这个包可以很好地完成以下任务:
- 在主请求线程外发送电子邮件
- 执行复杂或耗时的数据库操作
- 确保 job 的完整性和弹性
- 通过延迟非关键处理来加快响应时间
- 调度 job 在特定时间发生
这个包类似于 [Ruby Sidekiq](https://github.com/mperham/sidekiq)。它提供以下功能:
- 安全处理托管提供商发送的指示关闭、重启或新部署的 `SIGTERM``SIGINT` 信号。
- 不同的队列优先级。例如,可以指定在电子邮件队列上运行一个队列 job在数据处理队列上运行另一个 job。
- 实现可靠的队列进程帮助处理意外故障。
- 包含一个 `maxRetryCount` 特性,该特性将重复执行任务,直到任务成功,直到指定的计数。
- 使用 NIO 将所有可用的内核和 EventLoops 用于 job。
- 允许用户调度重复任务
目前,队列有一个官方支持的驱动程序,它与主协议接口:
- [QueuesRedisDriver](https://github.com/vapor/queues-redis-driver)
Queues 也有基于社区的驱动程序:
- [QueuesMongoDriver](https://github.com/vapor-community/queues-mongo-driver)
- [QueuesFluentDriver](https://github.com/vapor-community/vapor-queues-fluent-driver)
!!! tip "建议"
你不应该直接安装 `vapor/queues` 包,除非你正在构建一个新的驱动程序。安装其中一个驱动软件包即可。
## 入门
让我们看看如何使用队列。
### Package
使用队列的第一步是在 SwiftPM 文件中添加一个驱动程序作为项目的依赖项。在本例中,我们将使用 Redis 驱动程序。
```swift
// swift-tools-version:5.8
import PackageDescription
let package = Package(
name: "MyApp",
dependencies: [
/// Any other dependencies ...
.package(url: "https://github.com/vapor/queues-redis-driver.git", from: "1.0.0"),
],
targets: [
.executableTarget(name: "App", dependencies: [
// Other dependencies
.product(name: "QueuesRedisDriver", package: "queues-redis-driver")
]),
.testTarget(name: "AppTests", dependencies: [.target(name: "App")]),
]
)
```
如果你直接在 Xcode 中编辑清单文件,它会在文件保存时自动获取更改并获取新的依赖项。否则,从终端运行 `swift package resolve` 命令以获取新的依赖项。
### 配置
下一步是在 `configure.swift` 文件中配置队列,我们将使用 Redis 库作为示例:
```swift
import QueuesRedisDriver
try app.queues.use(.redis(url: "redis://127.0.0.1:6379"))
```
### 注册一个 `Job`
在为 job 建模后,你必须将其添加到配置部分中,如下所示:
```swift
// 注册 Job
let emailJob = EmailJob()
app.queues.add(emailJob)
```
### Worker 作为进程运行
要启动新的队列 worker请在终端运行 `swift run App queues`。 你还可以指定一个特定类型的 worker 来运行 `swift run App queues --queue emails`
!!! tip "建议"
生产环境应该保持 worker 一直运行。咨询你的托管提供商了解如何保持长时间运行的进程处于活动状态。例如Heroku 允许你在 Procfile 中指定这样的 “worker” dynos`worker: Run queues`。有了这个,你可以在仪表板/资源选项卡上启动 worker或者使用 `heroku ps:scale worker=1`(或首选的任何数量的 dynos
### 进程中运行 Worker
要在与你的应用程序相同的进程中运行 worker而不是启动一个完整的单独服务器来处理它请调用 `Application` 上的便利方法:
```swift
try app.queues.startInProcessJobs(on: .default)
```
要在进程中运行调度 job请调用以下方法
```swift
try app.queues.startScheduledJobs()
```
!!! warning "警告"
如果你不通过命令行或进程内 worker 启动队列job 将不会派发。
## `Job` 协议
Job 由 `Job``AsyncJob` 协议定义。
### 建模 `Job` 对象:
```swift
import Vapor
import Foundation
import Queues
struct Email: Codable {
let to: String
let message: String
}
struct EmailJob: Job {
typealias Payload = Email
func dequeue(_ context: QueueContext, _ payload: Email) -> EventLoopFuture<Void> {
// 这是你要发送电子邮件的位置
return context.eventLoop.future()
}
func error(_ context: QueueContext, _ error: Error, _ payload: Email) -> EventLoopFuture<Void> {
// 如果你不想处理错误,只需返回一个 future 对象。你也可以完全省略此功能。
return context.eventLoop.future()
}
}
```
如果使用 `async`/`await`,你应该使用 `AsyncJob`
```swift
struct EmailJob: AsyncJob {
typealias Payload = Email
func dequeue(_ context: QueueContext, _ payload: Email) async throws {
// 这是你要发送电子邮件的位置
}
func error(_ context: QueueContext, _ error: Error, _ payload: Email) async throws {
// 如果你不想处理错误,只需 return。你也可以完全省略此功能。
}
}
```
!!! info "信息"
确保你的 `Payload` 类型遵循 `Codable` 协议。
!!! tip "建议"
不要忘记按照**入门**中的说明将此 job 添加到你的配置文件中。
## 派发 Job
要派发队列 job你需要访问 `Application` 或者 `Request` 的实例。你很可能会在路由内派发 job
```swift
app.get("email") { req -> EventLoopFuture<String> in
return req
.queue
.dispatch(
EmailJob.self,
.init(to: "email@email.com", message: "message")
).map { "done" }
}
// 或
app.get("email") { req async throws -> String in
try await req.queue.dispatch(
EmailJob.self,
.init(to: "email@email.com", message: "message"))
return "done"
}
```
相反,如果你需要从 `Request` 对象不可用的上下文中分派 job (例如,从 `Command` 中),你需要使用 `Application` 对象中的 `queues` 属性,例如:
```swift
struct SendEmailCommand: AsyncCommand {
func run(using context: CommandContext, signature: Signature) async throws {
context
.application
.queues
.queue
.dispatch(
EmailJob.self,
.init(to: "email@email.com", message: "message")
)
}
}
```
### 设置 `maxRetryCount`
如果你指定 `maxRetryCount`job 将在出错时自动重试。例如:
```swift
app.get("email") { req -> EventLoopFuture<String> in
return req
.queue
.dispatch(
EmailJob.self,
.init(to: "email@email.com", message: "message"),
maxRetryCount: 3
).map { "done" }
}
// 或
app.get("email") { req async throws -> String in
try await req.queue.dispatch(
EmailJob.self,
.init(to: "email@email.com", message: "message"),
maxRetryCount: 3)
return "done"
}
```
### 指定延迟
还可以将 job 设置为仅在某个特定 `Date` 过后运行。要指定延迟,将 `Date` 传入 `Dispatch` 中的 `delayUntil` 参数中:
```swift
app.get("email") { req async throws -> String in
let futureDate = Date(timeIntervalSinceNow: 60 * 60 * 24) // 一天
try await req.queue.dispatch(
EmailJob.self,
.init(to: "email@email.com", message: "message"),
maxRetryCount: 3,
delayUntil: futureDate)
return "done"
}
```
如果 job 在其延迟参数之前出队,则该 job 将由驱动程序重新排队。
### 指定优先级
根据你的需要,可以将 job 分为不同的队列类型/优先级。例如,你可能想要打开一个 `email` 队列和一个 `background-processing` 队列来对 job 进行排序。
从扩展 `QueueName` 开始:
```swift
extension QueueName {
static let emails = QueueName(string: "emails")
}
```
然后,在检索 `jobs` 对象时指定队列类型:
```swift
app.get("email") { req -> EventLoopFuture<String> in
let futureDate = Date(timeIntervalSinceNow: 60 * 60 * 24) // 一天
return req
.queues(.emails)
.dispatch(
EmailJob.self,
.init(to: "email@email.com", message: "message"),
maxRetryCount: 3,
delayUntil: futureDate
).map { "done" }
}
// 或
app.get("email") { req async throws -> String in
let futureDate = Date(timeIntervalSinceNow: 60 * 60 * 24) // 一天
try await req
.queues(.emails)
.dispatch(
EmailJob.self,
.init(to: "email@email.com", message: "message"),
maxRetryCount: 3,
delayUntil: futureDate
)
return "done"
}
```
当从 `Application` 对象内部访问时,你应该这样做:
```swift
struct SendEmailCommand: AsyncCommand {
func run(using context: CommandContext, signature: Signature) async throws {
context
.application
.queues
.queue(.emails)
.dispatch(
EmailJob.self,
.init(to: "email@email.com", message: "message"),
maxRetryCount: 3,
delayUntil: futureDate
)
}
}
```
如果未指定队列,则 job 将在 `default` 队列上运行。确保按照**入门**中的说明为每种队列类型启动 worker。
## 调度 Job
Queues 包还允许你安排在特定时间点发生的 job。
!!! warning "警告"
调度 job 只有在应用程序启动之前设置,例如在 `configure.swift` 中设置时才能正常工作。它们无法在路由处理中工作。
### 启动调度 worker
调度程序需要一个独立的 worker 进程来运行,类似于队列 worker 进程。可以通过以下命令启动 worker
```sh
swift run App queues --scheduled
```
!!! tip "建议"
生产环境应该保持 worker 一直运行。请咨询你的服务托管提供商了解如何使长时间运行的进程保持活动状态。例如Heroku 允许你在 Procfile 中像这样指定 “worker” dynos`worker: App queues --scheduled`
### 创建一个 `ScheduledJob`
首先,首先创建一个新的 `ScheduledJob` 或者 `AsyncScheduledJob`
```swift
import Vapor
import Queues
struct CleanupJob: ScheduledJob {
// 如果你需要,可以通过依赖注入在这里添加额外的服务。
func run(context: QueueContext) -> EventLoopFuture<Void> {
// 在这里做一些工作,也许队列等待另一个 job。
return context.eventLoop.makeSucceededFuture(())
}
}
struct CleanupJob: AsyncScheduledJob {
// 如果你需要,可以通过依赖注入在这里添加额外的服务。
func run(context: QueueContext) async throws {
// 在这里做一些工作,也许队列等待另一个 job。
}
}
```
然后,在你的配置代码中,注册调度的 job
```swift
app.queues.schedule(CleanupJob())
.yearly()
.in(.may)
.on(23)
.at(.noon)
```
上述示例中的 job 将在每年5月23日中午12:00运行。
!!! tip "建议"
调度程序采用你服务器的时区。
### 可用的构建器方法
在调度程序上可以调用五个主要方法,每个方法都会创建其各自的包含更多辅助方法的构建器对象。你应该继续构建一个调度器对象,直到编译器没有向你发出有关未使用结果的警告。有关所有可用方法,请参见下文:
| 辅助函数 | 可用修饰符 | 描述 |
|-----------------|---------------------------------------|--------------------------------------------------------------------------------|
| `yearly()` | `in(_ month: Month) -> Monthly` | 运行 job 的月份。返回一个 `Monthly` 对象以进行进一步构建。|
| `monthly()` | `on(_ day: Day) -> Daily` | 运行 job 的日期。返回用于进一步构建的 `Daily` 对象。|
| `weekly()` | `on(_ weekday: Weekday) -> Daily` | 运行 job 的星期。返回一个 `Daily` 对象。|
| `daily()` | `at(_ time: Time)` | 运行 job 的时间。链中的最终方法。|
| | `at(_ hour: Hour24, _ minute: Minute)`| 运行 job 的小时和分钟。链中的最终方法。|
| | `at(_ hour: Hour12, _ minute: Minute, _ period: HourPeriod)` |运行 job 的小时、分钟和时间段。链中的最终方法。|
| `hourly()` | `at(_ minute: Minute)` |运行 job 的分钟。链中的最终方法。|
| `minutely()` | `at(_ second: Second)` | 运行 job 的秒数。链中的最终方法。 |
### 可用辅助函数
队列附带了一些辅助函数枚举以使调度更加容易:
| 辅助函数 | 辅助函数枚举 |
|-----------------|---------------------------------------|
| `yearly()` | `.january`, `.february`, `.march`, ...|
| `monthly()` | `.first`, `.last`, `.exact(1)` |
| `weekly()` | `.sunday`, `.monday`, `.tuesday`, ... |
| `daily()` | `.midnight`, `.noon` |
要使用辅助枚举,请在辅助函数上调用适当的修饰符并传递值。例如:
```swift
// 每年一月
.yearly().in(.january)
// 每月第一天
.monthly().on(.first)
// 每周周日
.weekly().on(.sunday)
// 每天午夜
.daily().at(.midnight)
```
## 事件委托
Queues 包允许你指定 `JobEventDelegate` 对象, 当 worker 对 job 执行操作时接收通知。这可用于监控、追踪或报警等目的。
首先,对象需要遵循 `JobEventDelegate` 协议并实现所需的方法
```swift
struct MyEventDelegate: JobEventDelegate {
/// 当 job 从路由中分派给队列工作者时调用
func dispatched(job: JobEventData, eventLoop: EventLoop) -> EventLoopFuture<Void> {
eventLoop.future()
}
/// 当 job 放入处理队列并开始工作时调用
func didDequeue(jobId: String, eventLoop: EventLoop) -> EventLoopFuture<Void> {
eventLoop.future()
}
/// 当 job 完成处理并从队列中删除时调用
func success(jobId: String, eventLoop: EventLoop) -> EventLoopFuture<Void> {
eventLoop.future()
}
/// 当 job 完成处理但出现错误时调用
func error(jobId: String, error: Error, eventLoop: EventLoop) -> EventLoopFuture<Void> {
eventLoop.future()
}
}
```
然后,将其添加到你的配置文件中:
```swift
app.queues.add(MyEventDelegate())
```
有许多第三方包使用委托功能来提供对队列工作额外的观察:
- [QueuesDatabaseHooks](https://github.com/vapor-community/queues-database-hooks)
- [QueuesDash](https://github.com/gotranseo/queues-dash)