Example And Deploy
Examples
For full examples see github.
Run Executor App
import asyncio
import time
from pyxxl import ExecutorConfig, PyxxlRunner
from pyxxl.ctx import g
# 如果xxl-admin可以直连executor的ip,可以不填写executor_listen_host
config = ExecutorConfig(
xxl_admin_baseurl="http://localhost:8080/xxl-job-admin/api/",
executor_app_name="xxl-job-executor-sample",
executor_host="127.0.0.1", # xxl-admin访问executor的地址
executor_listen_host="0.0.0.0", # xxl-admin监听时绑定的host
debug=True,
)
app = PyxxlRunner(config)
@app.register(name="demoJobHandler")
async def test_task():
# you can get task params with "g"
g.logger.info("get executor params: %s" % g.xxl_run_data.executorParams)
for i in range(10):
g.logger.warning("test logger %s" % i)
await asyncio.sleep(5)
return "成功..."
@app.register(name="xxxxx")
async def test_task3():
await asyncio.sleep(3)
return "成功3"
@app.register(name="sync_func")
def test_task4():
# 如果要在xxl-admin上看到执行日志,打印日志的时候务必用g.logger来打印,默认只打印info及以上的日志
n = 1
g.logger.info("Job %s get executor params: %s" % (g.xxl_run_data.jobId, g.xxl_run_data.executorParams))
# 如果同步任务里面有循环,为了支持cancel操作,必须每次都判断g.cancel_event.
while n <= 10 and not g.cancel_event.is_set():
g.logger.info(
"log to {} logger test_task4.{},params:{}".format(
g.xxl_run_data.jobId,
n,
g.xxl_run_data.executorParams,
)
)
time.sleep(2)
n += 1
return "成功3"
if __name__ == "__main__":
app.run_executor()
Run With Gunicorn Server
Note
由于Gunicorn一般是多进程模式部署,所以executor不能在webapp中绑定启动
需要在Gunicorn启动之后(when_ready)单独起一个进程来运行executor
此部署方式适用于任何可以被Gunicorn部署的web框架集成PYXXL
app.py
import asyncio
from fastapi import FastAPI
from pyxxl import JobHandler
app = FastAPI()
xxl_handler = JobHandler()
@app.get("/")
async def root():
return {"message": "Hello World"}
@xxl_handler.register(name="demoJobHandler")
async def test_task():
await asyncio.sleep(10)
return "成功10"
gunicorn.conf.py
import atexit
from multiprocessing.util import _exit_function
from pyxxl import ExecutorConfig, PyxxlRunner
bind = ["0.0.0.0:8000"]
backlog = 512
workers = 1
timeout = 300
graceful_timeout = 2
limit_request_field_size = 8192
def when_ready(server):
# pylint: disable=import-outside-toplevel,unused-import,no-name-in-module
from app import xxl_handler
atexit.unregister(_exit_function)
config = ExecutorConfig(
xxl_admin_baseurl="http://localhost:8080/xxl-job-admin/api/",
executor_app_name="xxl-job-executor-sample",
executor_host="172.17.0.1",
debug=True,
)
pyxxl_app = PyxxlRunner(config, handler=xxl_handler)
server.pyxxl_app = pyxxl_app
pyxxl_app.run_with_daemon()
Run with Flask (Only for develop)
Warning
此案例仅用于开发模式和本地调试使用,部署到生产环境时强烈建议和webapp分开部署!
由于Flask一般是多进程模式部署,和executor一起部署时需要executor需要单独启动,不能绑定在flask_app上
如果确定要部署在一起,参考gunicorn的案例
from flask import Flask
app = Flask(__name__)
@app.route("/")
def hello_world():
return "<p>Hello, World!</p>"
if __name__ == "__main__":
# 如果是多进程部署的flask无法这样使用,请参考gunicorn的example
# 此案例仅用于本地开发和调试用
from multiprocessing import freeze_support
from executor_app import app as pyxxl_app
freeze_support()
app.pyxxl_app = pyxxl_app
pyxxl_app.run_with_daemon()
app.run()