python 通过 flask-aphotoshopcheduler 实现定期执行任务
Flask-APScheduler 介绍
Flask-APScheduler 是基于 APScheduler 库开发的 Flask 拓展库。APScheduler 的全称是Advanced Python Scheduler 。本文主要通过 APScheduler 中的 cron触发器来实现定时启动任务的目的。
安装 flask_apscheduler 库
pip install flask_apscheduler
导入时报错
-
error: subprocess-exited-with-error
-
-
× pip subprocess to install build dependencies did not run successfully.
-
│ exit code: 2
-
╰─> See above for output.
-
-
note: This error originates from a subprocess, and is likely not a problem with pip.
在网上查阅相关资料,使用如下方式成功安装 flask-apscheduler。但是运行程序时,仍然识别不到flask-apscheduler 库
-
pip install --upgrade setuptools
-
pip install flask-apscheduler
基本结构
-
from flask import Flask
-
from flask_apscheduler import APScheduler
-
-
-
class Config():
-
JOBS = [
-
{
-
'id': 'job1',
-
'func': 'test:test', # 第一个test为函数所在py文件名
-
'args': (), # 函数不需要参数时,可以设置为空元组,也可以直接省略
-
'trigger': 'cron', # 使用cron触发器
-
'day': '*', # * 表示每一天
-
'hour': '10',
-
'minute': '0',
-
'second': '0'
-
-
}
-
]
-
SCHEDULER_API_ENABLED = True
-
-
-
def test():
-
print('success!')
-
-
-
if __name__ == '__main__':
-
app = Flask(__name__)
-
-
app.config.from_object(Config())
-
-
scheduler = APScheduler()
-
scheduler.init_app(app) # 将调度器对象与Flask应用程序实例(app)相关联
-
scheduler.start()
-
-
app.run(host='0.0.0.0', port=8000, debug=True)
-
其中,SCHEDULER_API_ENABLED = True 用来启用 Flask-APScheduler 库的 API 功能,设置为 True 后,可以使用 API 来查看和管理已经调度的任务。例如,可以通过对 http://127.0.0.1:8000/scheduler/jobs 发送 GET 请求,来获取所有已添加的 Job 信息。(如果不需要 API 功能,可以设置为 False ,以提高安全性和性能)
前端返回结果如下:
-
[
-
{
-
"id": "job1",
-
"name": "job1",
-
"func": "test:test",
-
"args": [],
-
"kwargs": {},
-
"trigger": "cron",
-
"day": "*",
-
"hour": "10",
-
"minute": "0",
-
"second": "0",
-
"misfire_grace_time": 1,
-
"max_instances": 1,
-
"next_run_time": "2023-05-19T10:00:00 08:00"
-
}
-
]
app.config.from_object() 用于从一个 python 对象中加载配置变量将其应用于 Flask 应用程序。在这段代码中,通过 app.config['JOBS'] 可以获取到定义的调度任务列表,而通过 app.config['SCHEDULER_API_ENABLED'] 可以获取到 API 是否被启用。其他 Flask 扩展也可以通过这种方式使用配置变量,例如 Flask-SQLAlchemy 扩展需要从 SQLALCHEMY_DATABASE_URI 变量中获取数据库连接字符串。
应用实例
需求:每天定时检查文件夹大小,并进行管理
-
import os
-
import time
-
-
from flask import Flask
-
from flask_apscheduler import APScheduler
-
-
-
def folder_size(path):
-
total = 0
-
for entry in os.scandir(path):
-
if entry.is_file():
-
total = entry.stat().st_size
-
elif entry.is_dir():
-
total = folder_size(entry.path)
-
return total
-
-
-
def mng_folder_size():
-
for f in os.listdir('/home/user/video'):
-
if (folder_size('/home/user/video' f)) / 1024 * 1024 * 1024 >= 5:
-
current_time = time.time()
-
for i in os.listdir('/home/user/video/' f):
-
creation_time = os.path.getmtime('/home/user/video/' f '/' i)
-
if (current_time - creation_time) // (24 * 60 * 60) >= 7:
-
os.unlink('/home/user/video/' f '/' i)
-
print(f'{i} removed.')
-
-
-
class Config(object):
-
JOBS = [
-
{
-
'id': 'job1',
-
'func': mng_folder_size,
-
'trigger': 'cron',
-
'day': '*',
-
'hour': '0',
-
'minute': '0',
-
'second': '0'
-
},
-
{
-
'id': 'job2',
-
'func': mng_folder_size,
-
'trigger': 'cron',
-
'day': '*',
-
'hour': '12',
-
'minute': '0',
-
'second': '0'
-
}
-
]
-
SCHEDULER_API_ENABLED = True
-
-
-
if __name__ == '__main__':
-
app = Flask(__name__) # 也可以写在上边
-
-
app.config.from_object(Config())
-
-
scheduler = APScheduler()
-
scheduler.init_app(app)
-
scheduler.start()
-
-
app.run(host='0.0.0.0', port=5055, debug=True)
folder_size() 用于计算文件夹大小。它使用递归来遍历整个目录树,判断是文件还是目录。如果是文件,直接将其大小计入 total ;如果是目录,则递归调用本函数以继续遍历其中所有的文件和子目录,并计算大小。
os.scandir() 用于遍历指定目录下的文件和子目录。与 os.listdir() 不同,os.scandir() 返回一个迭代器对象,可以逐个访问每个目录项,并且提供了对象的一些属性和方法:
- name : 目录项的名称(str)
- path : 目录项的路径(str)
- is_file() : 判断目录项是否为普通文件
- is_dir() : 判断目录项是否为目录
- is_symlink() : 判断目录项是否为符号链接
- stat() : 获取目录项的状态信息
遇到的问题
-
D:\ProgramData\Anaconda3\lib\site-packages\apscheduler\util.py:428: PytzUsageWarning: The localize method is no longer necessary, as this time zone supports the fold attribute (PEP 495). For more details on migrating to a PEP 495-compliant implementation, see https://pytz-deprecation-shim.readthedocs.io/en/latest/migration.html
-
return tzinfo.localize(dt)
在Config类中加入 SCHEDULER_TIMEZONE = 'Asia/Shanghai' 即可解决时区问题。
这篇好文章是转载于:学新通技术网
- 版权申明: 本站部分内容来自互联网,仅供学习及演示用,请勿用于商业和其他非法用途。如果侵犯了您的权益请与我们联系,请提供相关证据及您的身份证明,我们将在收到邮件后48小时内删除。
- 本站站名: 学新通技术网
- 本文地址: /boutique/detail/tanhggfbga
-
photoshop保存的图片太大微信发不了怎么办
PHP中文网 06-15 -
《学习通》视频自动暂停处理方法
HelloWorld317 07-05 -
Android 11 保存文件到外部存储,并分享文件
Luke 10-12 -
word里面弄一个表格后上面的标题会跑到下面怎么办
PHP中文网 06-20 -
photoshop扩展功能面板显示灰色怎么办
PHP中文网 06-14 -
微信公众号没有声音提示怎么办
PHP中文网 03-31 -
excel下划线不显示怎么办
PHP中文网 06-23 -
excel打印预览压线压字怎么办
PHP中文网 06-22 -
TikTok加速器哪个好免费的TK加速器推荐
TK小达人 10-01 -
怎样阻止微信小程序自动打开
PHP中文网 06-13