Skip to content

Example And Deploy

Examples

For full examples see github.

Run Executor App

import asyncio
import time

from pyxxl import ExecutorConfig, PyxxlRunner
from pyxxl.ctx import g

# 如果xxl-admin可以直连executor的ip,可以不填写executor_listen_host
config = ExecutorConfig(
    xxl_admin_baseurl="http://localhost:8080/xxl-job-admin/api/",
    executor_app_name="xxl-job-executor-sample",
    executor_host="127.0.0.1",  # xxl-admin访问executor的地址
    executor_listen_host="0.0.0.0",  # xxl-admin监听时绑定的host
    debug=True,
)

app = PyxxlRunner(config)


@app.register(name="demoJobHandler")
async def test_task():
    # you can get task params with "g"
    g.logger.info("get executor params: %s" % g.xxl_run_data.executorParams)
    for i in range(10):
        g.logger.warning("test logger %s" % i)
    await asyncio.sleep(5)
    return "成功..."


@app.register(name="xxxxx")
async def test_task3():
    await asyncio.sleep(3)
    return "成功3"


@app.register(name="sync_func")
def test_task4():
    # 如果要在xxl-admin上看到执行日志,打印日志的时候务必用g.logger来打印,默认只打印info及以上的日志
    n = 1
    g.logger.info("Job %s get executor params: %s" % (g.xxl_run_data.jobId, g.xxl_run_data.executorParams))
    # 如果同步任务里面有循环,为了支持cancel操作,必须每次都判断g.cancel_event.
    while n <= 10 and not g.cancel_event.is_set():
        g.logger.info(
            "log to {} logger test_task4.{},params:{}".format(
                g.xxl_run_data.jobId,
                n,
                g.xxl_run_data.executorParams,
            )
        )
        time.sleep(2)
        n += 1
    return "成功3"


if __name__ == "__main__":
    app.run_executor()
python3 executor_app.py

Run With Gunicorn Server

Note

由于Gunicorn一般是多进程模式部署,所以executor不能在webapp中绑定启动

需要在Gunicorn启动之后(when_ready)单独起一个进程来运行executor

此部署方式适用于任何可以被Gunicorn部署的web框架集成PYXXL

app.py

import asyncio

from fastapi import FastAPI

from pyxxl import JobHandler

app = FastAPI()
xxl_handler = JobHandler()


@app.get("/")
async def root():
    return {"message": "Hello World"}


@xxl_handler.register(name="demoJobHandler")
async def test_task():
    await asyncio.sleep(10)
    return "成功10"

gunicorn.conf.py

import atexit
from multiprocessing.util import _exit_function

from pyxxl import ExecutorConfig, PyxxlRunner

bind = ["0.0.0.0:8000"]
backlog = 512
workers = 1
timeout = 300
graceful_timeout = 2
limit_request_field_size = 8192


def when_ready(server):
    # pylint: disable=import-outside-toplevel,unused-import,no-name-in-module
    from app import xxl_handler

    atexit.unregister(_exit_function)

    config = ExecutorConfig(
        xxl_admin_baseurl="http://localhost:8080/xxl-job-admin/api/",
        executor_app_name="xxl-job-executor-sample",
        executor_host="172.17.0.1",
        debug=True,
    )

    pyxxl_app = PyxxlRunner(config, handler=xxl_handler)
    server.pyxxl_app = pyxxl_app
    pyxxl_app.run_with_daemon()

gunicorn -c gunicorn.conf.py app:app -b 0.0.0.0:9000

Run with Flask (Only for develop)

Warning

此案例仅用于开发模式和本地调试使用,部署到生产环境时强烈建议和webapp分开部署!

由于Flask一般是多进程模式部署,和executor一起部署时需要executor需要单独启动,不能绑定在flask_app上

如果确定要部署在一起,参考gunicorn的案例

from flask import Flask

app = Flask(__name__)


@app.route("/")
def hello_world():
    return "<p>Hello, World!</p>"


if __name__ == "__main__":
    # 如果是多进程部署的flask无法这样使用,请参考gunicorn的example
    # 此案例仅用于本地开发和调试用
    from multiprocessing import freeze_support

    from executor_app import app as pyxxl_app

    freeze_support()

    app.pyxxl_app = pyxxl_app
    pyxxl_app.run_with_daemon()
    app.run()