跳到主要内容

生命周期

lifespan 是一个 ASGI 协议,在你的应用启动和停止时会被调用。

lihil 期望一个具有以下接口的生命周期:

type LifeSpan = Callable[
["Lihil"], AsyncContextManager[None] | AsyncGenerator[None, None]
]

示例:


async def example_lifespan(app: Lihil):
engine = app.graph.resolve(create_async_engine)
await engine.execute(text("SELECT 1"))
yield
await engine.dispose()

lhl = Lihil(lifespan=example_lifespan)

何时使用生命周期以及在生命周期处理器中做什么

ASGI 生命周期处理器是管理异步应用程序启动和关闭逻辑的绝佳方式。这些处理器在应用程序启动和停止时各运行一次,使它们非常适合管理需要异步初始化和清理的资源。

测试服务可用性

from sqlalchemy.ext.asyncio import create_async_engine

async def lifespan(app: Lihil):
engine = create_async_engine(app.config.database.url)
await engine.execute(text("SELECT 1"))
yield
await engine.dispose() # 关闭时关闭连接

await engine.execute(text("SELECT 1")) 向你的数据库发送一个虚拟查询,因此测试服务的可用性,如果失败,应用程序将快速失败而不启动。

创建需要事件循环的单例对象

在使用异步客户端或服务(如 Kafka 生产者或数据库客户端)时,你需要确保它们在事件循环内启动:

from aiokafka import AIOKafkaProducer

async def lifespan(app: Lihil):
kafka = AIOKafkaProducer(bootstrap_servers=app.config.kafka.url)
await kafka.start()
yield
await kafka.stop()

这里,Kafka 生产者在生命周期处理器内作为单例对象创建。生产者在启动时连接到 Kafka 代理,并在应用停止时正确关闭。

清理资源

在关闭期间,清理任何连接、释放资源或优雅地停止服务很重要:

async def lifespan(app: Lihil):
engine = create_async_engine(...)
kafka = AIOKafkaProducer(...)
await kafka.start()
yield
await kafka.stop() # 干净地停止 Kafka 生产者
await engine.dispose() # 处理引擎或任何数据库连接

日志记录

生命周期处理器也是记录关键事件(如应用启动和关闭)的好地方:

import logging

logger = logging.getLogger(__name__)

async def lifespan(app):
logger.info("App is starting...")
await engine.execute(text("SELECT 1"))
yield
logger.info("App is shutting down...")
await engine.dispose()

注册指标(例如 Prometheus)

在某些情况下,你可能想在启动时初始化监控或指标服务:

from prometheus_client import start_http_server, Counter

metrics_counter = Counter('app_start', 'App has started')

async def lifespan(app):
# 在单独的端口上启动 Prometheus 指标服务器
start_http_server(app.config.prometheus.url)
metrics_counter.inc() # 增加计数器以跟踪应用启动
yield

调度后台任务或初始化任务调度器

如果你的应用需要调度定期任务(例如使用 Celery 或其他调度器),你可以在生命周期处理器中执行此操作:

import asyncio

async def schedule_tasks():
while True:
print("Running periodic task...")
await asyncio.sleep(60) # 每 60 秒运行一次

async def lifespan(app):
task = asyncio.create_task(schedule_tasks())
yield
# 在关闭期间取消后台任务
task.cancel()
try:
await task
except asyncio.CancelledError:
pass

预加载缓存或内存数据

如果你的应用依赖于预加载某些数据(如缓存或配置),你可以在生命周期处理器中执行此操作以加快应用启动速度:

async def preload_cache():
# 示例:将经常访问的数据预加载到缓存中
await cache.set("key", "value")

async def lifespan(app):
# 在应用启动时预加载缓存
await preload_cache()
yield

技术细节

当 ASGI 服务器(例如 uvicorn)启动和停止时,它向它托管的 Web 框架(例如 lihil)发送生命周期事件。 lihil 接收生命周期消息后,它会首先运行用户提供的生命周期处理器(如果有的话),然后运行内部设置。