StackStalk
  • Home
  • Java
    • Java Collection
    • Spring Boot Collection
  • Python
    • Python Collection
  • C++
    • C++ Collection
    • Progamming Problems
    • Algorithms
    • Data Structures
    • Design Patterns
  • General
    • Tips and Tricks

Friday, April 30, 2021

Scheduling jobs in Python

 April 30, 2021     Python     No comments   

When developing applications and microservices we run into scenarios where there is a need to run scheduled tasks. Examples include performing a cleanup of temporary files, performing a daily backup, polling a job queue, periodic metrics gathering and sending an email etc. 

Python supports multiple approaches to run scheduled or recurring jobs. Let us explore some these in this article.

Using Advanced Python Scheduler (APScheduler)

https://apscheduler.readthedocs.io/en/stable/ 

Advanced Python Scheduler (APScheduler) is a Python library that lets you schedule your Python code to be executed later, either just once or periodically. APScheduler allows to store jobs in a database which will help to maintain their state and survive scheduler restarts. 

Before you start, install apscheduler module using pip.

pip install apscheduler
APScheduler has four kinds of components. 
  • Triggers contain the scheduling logic. 
  • Job stores house the scheduled jobs.  The default job store simply keeps the jobs in memory, but others store them in various kinds of databases. 
  • Executors are what handle the running of the jobs. 
  • Schedulers are what bind the rest together. You typically have only one scheduler running in your application
#1 BackgroundScheduler enables to run the scheduler in background within the application. In the most simple case register a job function to the scheduler and specify a time interval. To limit the number of concurrently executing instances of a job specific max_instance in job defaults.
from apscheduler.schedulers.background import BackgroundScheduler
import time

# Job to perform
def worker_function():
    print("In worker function .. started")

job_defaults = {
    'max_instances': 1
}

# Create and start the background scheduler
scheduler = BackgroundScheduler(job_defaults=job_defaults)
scheduler.add_job(worker_function, 'interval', seconds=5)
scheduler.start()

# Keep main loop active
while True:
    time.sleep(10)
Background scheduler triggers the worker_function every 5 seconds which produces the following output.
In worker function .. started
In worker function .. started
In worker function .. started
In worker function .. started
In worker function .. started
In worker function .. started
....
#2 Let us tweak the worker function to included a delay of 10 secs to validate max_instances setting.
from apscheduler.schedulers.background import BackgroundScheduler
import time

# Job to perform
def worker_function():
    print("In worker function .. started")
    time.sleep(10)

job_defaults = {
    'max_instances': 1
}

# Create and start the background scheduler
scheduler = BackgroundScheduler(job_defaults=job_defaults)
scheduler.add_job(worker_function, 'interval', seconds=5)
scheduler.start()

# Keep main loop active
while True:
    time.sleep(10)
Background scheduler triggers the worker_function every 5 seconds. It would skip running another instance whenever it hits max_instance setting. You will observe the following output.
In worker function .. started
Execution of job "worker_function (trigger: interval[0:00:05], next run at: 2021-04-30 12:55:26 IST)" skipped: maximum number of running instances reached (1)
Execution of job "worker_function (trigger: interval[0:00:05], next run at: 2021-04-30 12:55:31 IST)" skipped: maximum number of running instances reached (1)
In worker function .. started
#3 Let us modify the same program to add a job store in MongoDB to bring in job persistence.
Here we are using the MongoDBJobStore to add job persistence. This creates a database "apscheduler" and collection "jobs" in Mongo database to support persistence. When a job is scheduled in a persistent job store and the scheduler is shut down and restarted after the job was supposed to execute. When this happens, the job is considered to have “misfired”. The scheduler will then check each missed execution time to see if the execution should still be triggered. This can lead into the job being executed several times in succession.
from apscheduler.schedulers.background import BackgroundScheduler
from apscheduler.jobstores.mongodb import MongoDBJobStore
import time

def worker_function():
    print("In worker function .. started")

job_stores = {
    'default': MongoDBJobStore(database='apscheduler', collection='jobs', host='localhost', port=27017)
}

job_defaults = {
    'coalesce': False,
    'max_instances': 1
}
scheduler = BackgroundScheduler(jobstores=job_stores, job_defaults=job_defaults)
scheduler.add_job(worker_function, 'interval', seconds=5)
scheduler.start()

while True:
    time.sleep(10)
Run the above program and after few triggers of the worker function stop the program. Now wait for 30 secs and then restart the program. You will observe that due to job persistence all the missed runs will be triggered. To avoid multiple missed jobs being executed in succession turn the coalesce flag to True.
Run time of job "worker_function (trigger: interval[0:00:05], next run at: 2021-04-30 14:57:25 IST)" was missed by 0:00:11.656328
Run time of job "worker_function (trigger: interval[0:00:05], next run at: 2021-04-30 14:57:25 IST)" was missed by 0:00:06.656372
Run time of job "worker_function (trigger: interval[0:00:05], next run at: 2021-04-30 14:57:25 IST)" was missed by 0:00:01.656415
In worker function .. started
In worker function .. started
In worker function .. started

Using schedule

https://pypi.org/project/schedule/
Python schedule module is a simple to use API for scheduling jobs. Very lightweight and no external dependencies. This library is designed to be a simple solution for simple scheduling problems. 

Not recommended to use schedule module if following capabilities are needed: 
  • Job persistence 
  • Exact timing (sub-second precision execution) 
  • Concurrent execution (multiple threads) 
  • Localization (time zones, workdays or holidays)

Before you start, install schedule module using pip.

pip install schedule
Simple example to use schedule module to trigger jobs at specific intervals.
import schedule
import time

def job1():
    print("I'm job1 ...")

def job2():
    print("I'm job2 ...")

schedule.every(3).seconds.do(job1)
schedule.every(5).seconds.do(job2)

while True:
    schedule.run_pending()
    time.sleep(1)

Using Timeloop

https://pypi.org/project/timeloop
Timeloop is a service that can be used to run periodic tasks after a certain interval. Uses decorator pattern for running tagged functions in threads.

Before you start, install timeloop module using pip.

pip install timeloop
Simple example to use Timeloop module to trigger jobs at specific intervals.
import time

from timeloop import Timeloop
from datetime import timedelta

tl = Timeloop()

@tl.job(interval=timedelta(seconds=3))
def job1():
    print("In job1 ...")

tl.start()

while True:
    try:
        time.sleep(1)
    except KeyboardInterrupt:
        tl.stop()
        break
Email ThisBlogThis!Share to XShare to Facebook
Newer Post Older Post Home

0 comments:

Post a Comment

Follow @StackStalk
Get new posts by email:
Powered by follow.it

Popular Posts

  • Python FastAPI file upload and download
    In this article, we will look at an example of how to implement a file upload and download API in a Python FastAPI microservice. Example bel...
  • Avro Producer and Consumer with Python using Confluent Kafka
    In this article, we will understand Avro a popular data serialization format in streaming data applications and develop a simple Avro Produc...
  • Monitor Spring Boot App with Micrometer and Prometheus
    Modern distributed applications typically have multiple microservices working together. Ability to monitor and manage aspects like health, m...
  • Server-Sent Events with Spring WebFlux
    In this article we will review the concepts of server-sent events and work on an example using WebFlux. Before getting into this article it ...
  • Accessing the Kubernetes API
    In this article, we will explore the steps required to access the Kubernetes API and overcome common challenges. All operations and communic...
  • Python FastAPI microservice with Okta and OPA
    Authentication (AuthN) and Authorization (AuthZ) is a common challenge when developing microservices. In this article, we will explore how t...
  • Scheduling jobs in Python
    When developing applications and microservices we run into scenarios where there is a need to run scheduled tasks. Examples include performi...
  • Using Tekton to deploy KNative services
    Tekton is a popular open-source framework for building continuous delivery pipelines. Tekton provides a declarative way to define pipelines ...

Copyright © StackStalk