import asyncio import signal from typing import Any import uvicorn from fastapi import FastAPI from loguru import logger async def serve_http(app: FastAPI, **uvicorn_kwargs: Any): config = uvicorn.Config(app, **uvicorn_kwargs) server = uvicorn.Server(config) loop = asyncio.get_running_loop() server_task = loop.create_task(server.serve()) def signal_handler() -> None: # prevents the uvicorn signal handler to exit early server_task.cancel() async def dummy_shutdown() -> None: pass loop.add_signal_handler(signal.SIGINT, signal_handler) loop.add_signal_handler(signal.SIGTERM, signal_handler) try: await server_task return dummy_shutdown() except asyncio.CancelledError: logger.info("Gracefully stopping http server") return server.shutdown()