When developing applications and microservices we run into scenarios where there is a need to run scheduled tasks. Examples include performing a cleanup of temporary files, performing a daily backup, polling a job queue, periodic metrics gathering and sending an email etc.
Python supports multiple approaches to run scheduled or recurring jobs. Let us explore some these in this article.
Using Advanced Python Scheduler (APScheduler)
https://apscheduler.readthedocs.io/en/stable/Advanced Python Scheduler (APScheduler) is a Python library that lets you schedule your Python code to be executed later, either just once or periodically. APScheduler allows to store jobs in a database which will help to maintain their state and survive scheduler restarts.
Before you start, install apscheduler module using pip.
pip install apschedulerAPScheduler has four kinds of components.
- Triggers contain the scheduling logic.
- Job stores house the scheduled jobs. The default job store simply keeps the jobs in memory, but others store them in various kinds of databases.
- Executors are what handle the running of the jobs.
- Schedulers are what bind the rest together. You typically have only one scheduler running in your application
from apscheduler.schedulers.background import BackgroundScheduler import time # Job to perform def worker_function(): print("In worker function .. started") job_defaults = { 'max_instances': 1 } # Create and start the background scheduler scheduler = BackgroundScheduler(job_defaults=job_defaults) scheduler.add_job(worker_function, 'interval', seconds=5) scheduler.start() # Keep main loop active while True: time.sleep(10)
In worker function .. started In worker function .. started In worker function .. started In worker function .. started In worker function .. started In worker function .. started ....
from apscheduler.schedulers.background import BackgroundScheduler import time # Job to perform def worker_function(): print("In worker function .. started") time.sleep(10) job_defaults = { 'max_instances': 1 } # Create and start the background scheduler scheduler = BackgroundScheduler(job_defaults=job_defaults) scheduler.add_job(worker_function, 'interval', seconds=5) scheduler.start() # Keep main loop active while True: time.sleep(10)Background scheduler triggers the worker_function every 5 seconds. It would skip running another instance whenever it hits max_instance setting. You will observe the following output.
In worker function .. started Execution of job "worker_function (trigger: interval[0:00:05], next run at: 2021-04-30 12:55:26 IST)" skipped: maximum number of running instances reached (1) Execution of job "worker_function (trigger: interval[0:00:05], next run at: 2021-04-30 12:55:31 IST)" skipped: maximum number of running instances reached (1) In worker function .. started
from apscheduler.schedulers.background import BackgroundScheduler from apscheduler.jobstores.mongodb import MongoDBJobStore import time def worker_function(): print("In worker function .. started") job_stores = { 'default': MongoDBJobStore(database='apscheduler', collection='jobs', host='localhost', port=27017) } job_defaults = { 'coalesce': False, 'max_instances': 1 } scheduler = BackgroundScheduler(jobstores=job_stores, job_defaults=job_defaults) scheduler.add_job(worker_function, 'interval', seconds=5) scheduler.start() while True: time.sleep(10)Run the above program and after few triggers of the worker function stop the program. Now wait for 30 secs and then restart the program. You will observe that due to job persistence all the missed runs will be triggered. To avoid multiple missed jobs being executed in succession turn the coalesce flag to True.
Run time of job "worker_function (trigger: interval[0:00:05], next run at: 2021-04-30 14:57:25 IST)" was missed by 0:00:11.656328 Run time of job "worker_function (trigger: interval[0:00:05], next run at: 2021-04-30 14:57:25 IST)" was missed by 0:00:06.656372 Run time of job "worker_function (trigger: interval[0:00:05], next run at: 2021-04-30 14:57:25 IST)" was missed by 0:00:01.656415 In worker function .. started In worker function .. started In worker function .. started
Using schedule
https://pypi.org/project/schedule/Python schedule module is a simple to use API for scheduling jobs. Very lightweight and no external dependencies. This library is designed to be a simple solution for simple scheduling problems.
- Job persistence
- Exact timing (sub-second precision execution)
- Concurrent execution (multiple threads)
- Localization (time zones, workdays or holidays)
Before you start, install schedule module using pip.
pip install schedule
import schedule import time def job1(): print("I'm job1 ...") def job2(): print("I'm job2 ...") schedule.every(3).seconds.do(job1) schedule.every(5).seconds.do(job2) while True: schedule.run_pending() time.sleep(1)
Using Timeloop
https://pypi.org/project/timeloopTimeloop is a service that can be used to run periodic tasks after a certain interval. Uses decorator pattern for running tagged functions in threads.
Before you start, install timeloop module using pip.
pip install timeloop
import time from timeloop import Timeloop from datetime import timedelta tl = Timeloop() @tl.job(interval=timedelta(seconds=3)) def job1(): print("In job1 ...") tl.start() while True: try: time.sleep(1) except KeyboardInterrupt: tl.stop() break
0 comments:
Post a Comment