Trong dự án công trình lúc này của tôi Lúc cho tới phần scaling khối hệ thống thì bản vẽ xây dựng lúc này theo phía microservice bắt gặp nên một vấn đề: từng service nhập khối hệ thống đều tương tác thẳng với database nên xẩy ra yếu tố diện tích lớn service thì sẽ càng nhiều liên kết cho tới database kéo đến biểu hiện xẩy ra deadlock, performance cũng khá chậm rãi vì thế những liên kết cho tới database kể từ những service nên ngóng nhau hóa giải.
Sau Lúc được khêu gợi ý về sự gửi sang trọng sử dụng mặt hàng đợi thay cho nhằm những service thao tác thẳng với database, bản thân với dành riêng thời hạn dò la hiểu tăng về bản vẽ xây dựng Queue. Do dự án công trình chạy hầu hết vì như thế python nên tech lead khêu gợi ý dùng Celery, một khối hệ thống vận hành queue thịnh hành.
Bạn đang xem: celery là gì
Kiến trúc sau khoản thời gian gửi sang trọng dùng queue nhập khối hệ thống của tôi tiếp tục như sau. Một nội dung bài viết khá cụ thể về một dạng kiến thiết queue là message queue người xem rất có thể tìm hiểu thêm ở toidicodedao
Về Celery
- Là một khối hệ thống vận hành mặt hàng đợi xử lý task thời hạn thực. Trong khối hệ thống Celery tất cả chúng ta tiếp tục dùng định nghĩa task tương tự như job ở một trong những framework khác ví như Sidekiq.
- Input của celery cần thiết liên kết với cùng một loại message broker còn output rất có thể liên kết cho tới một khối hệ thống backend nhằm tàng trữ kết quả
Mọi người rất có thể xem thêm một nội dung bài viết không giống về Celery bên trên viblo ở trên đây. Dường như Celery cũng đều có một khối hệ thống document cụ thể và đọc dễ ở trang chủ https://docs.celeryproject.org/en/latest/getting-started/introduction.html.
Các Việc nên dùng Celery
- Chạy background jobs
- Chạy những job lập lịch
- Tính toán phân tán
- Xử lý tuy vậy song
Các tác dụng chủ yếu Celery cung cấp
- Monitor: giám sát những job/task được tiến hành queue
- Scheduling: chạy những task lập lịch (giống cronjob)
- Workflows: tạo nên một luồng xử lý task
- Time & Rate Limits: trấn áp con số task được thực đua nhập một khoảng tầm thời hạn, thời hạn một task được chạy,...
- Resource Leak Protection: trấn áp khoáng sản nhập quy trình xử lý task
- User Component: được cho phép người tiêu dùng tự động customize những worker.
Cơ chế của Celery
- Celery sinh hoạt dựa vào định nghĩa task queue. Đây là hình thức queue dùng làm điều phối những job/work trong những máy không giống nhau. Các worker tiếp tục nhận task, chạy task và trả về thành quả.
- Input của queue:
- Task
- Các process bên trên từng worker tiếp tục theo đòi dõi queue nhằm thực đua những task vừa mới được đẩy nhập queue
- Celery thông thường sử dụng một message broker nhằm điều phối task trong những clients và worker. Để tạo nên một task mới mẻ client tiếp tục thêm 1 message nhập queue, broker tiếp sau đó tiếp tục gửi message này cho tới worker. Celery tương hỗ 3 loại broker:
- RabbitMQ
- Redis
- SQS
- Một khối hệ thống dùng celery rất có thể có rất nhiều workers và brokers, vì vậy việc scale theo hướng ngang tiếp tục vô cùng đơn giản và dễ dàng.
Các module chủ yếu của Celery
Application
-
Một instance được khởi tạo nên kể từ tủ sách Celery được gọi là application
-
Nhiều Celery application rất có thể nằm trong tồn bên trên nhập một process
-
Khởi tạo nên một celery application:
from celery import Celery app = Celery()
-
Khi gửi một message cho tới queue, message này sẽ chỉ chứa chấp thương hiệu của task cần thiết thực đua.
-
Các celery worker tiếp tục map thân thích thương hiệu của task với hàm thực đua task tê liệt, việc mapping như thế được gọi là
task registry
@app.task def add(x, y): return x + y
Tasks
- Task nhập Celery với nhì trọng trách chính:
- định nghĩa những gì tiếp tục xẩy ra sau khoản thời gian một task được gọi (gửi cút message)
- định nghĩa những gì tiếp tục xẩy ra Lúc một worker có được message đó
- Mỗi task với cùng một thương hiệu riêng rẽ ko trùng lặp, thương hiệu này sẽ tiến hành refer nhập message nhằm worker rất có thể tìm ra đích thị hàm nhằm thực đua. Nếu ko khái niệm thương hiệu cho tới task thì task tê liệt sẽ tiến hành tự động gọi là nhờ vào module tuy nhiên task được khái niệm và thương hiệu function của task.
- Các message của task tiếp tục không xẩy ra xóa ngoài queue chừng nào là message tê liệt không được một worker xử lý. Một worker rất có thể xử lý nhiều message, nếu như worker bị crash tuy nhiên ko xử lý không còn những message tê liệt thì bọn chúng vẫn rất có thể được gửi lại cho tới một worker khác
- Các function của task nên ở tình trạng idempotent: function không khiến rời khỏi tác động gì bao gồm Lúc với bị gọi rất nhiều lần với và một thông số => một task tiếp tục thực đua tiếp tục đáp ứng không xẩy ra chạy lại lần tiếp nữa.
Tạo task
-
Để tạo nên task tất cả chúng ta sử dụng decorator
@task
from models import User @app.task(name='create_new_user') def create_user(username, password): User.objects.create(username=username, password=password)
-
Để task rất có thể retry tất cả chúng ta rất có thể bound task nhập chủ yếu instance của nó
@task(bind=True) def add(self, x, y): logger.info(self.request.id)
-
Task cũng rất có thể kế tiếp thừa
import celery class MyTask(celery.Task): def on_failure(self, exc, task_id, args, kwargs, einfo): print('{0!r} failed: {1!r}'.format(task_id, exc)) @task(base=MyTask) def add(x, y): raise KeyError()
-
Để hiểu thêm vấn đề và tình trạng của task tất cả chúng ta rất có thể dùng
Task.request
@app.task(bind=True) def dump_context(self, x, y): print('Executing task id {0.id}, args: {0.args!r} kwargs: {0.kwargs!r}'.format( self.request))
-
Celery vận hành tình trạng của tasks và rất có thể lưu bọn chúng trong số khối hệ thống gọi là result backend. Vòng đời khoác toan của task nhập Celery gồm:
-
PENDING: task đợi được thực đua.
-
STARTED: task tiếp tục khởi chạy
-
SUCCESS: task tiếp tục chạy trở thành công
-
FAILURE: task bắt gặp lỗi sau khoản thời gian khởi chạy
-
RETRY: task đang rất được chạy lại
-
REVOKED: task được tịch thu lại
-
Ngoài những tình trạng khoác toan bên trên tất cả chúng ta rất có thể tự động khái niệm tăng tình trạng và update tình trạng cho tới task vì như thế method
update_state
Xem thêm: pull your socks up là gì
@app.task(bind=True) def upload_files(self, filenames): for i, file in enumerate(filenames): if not self.request.called_directly: self.update_state(state='PROGRESS', meta={'current': i, 'total': len(filenames)})
-
Gọi task
-
Celery hỗ trợ những API nhằm gọi task sau khoản thời gian tiếp tục khái niệm bọn chúng phía trên.
-
3 method chính:
apply_async
: gửi task message.delay
: gửi task messagecalling
: task message sẽ không còn được gửi tiếp cận worker tuy nhiên task sẽ tiến hành thực đua luôn luôn vì như thế process lúc này.
-
Có một task như sau:
@app.task def add(x, y): return x + y
-
Để gọi task này tất cả chúng ta tiếp tục demo sử dụng 2 method là
apply_async
vàdelay
- Với delay tất cả chúng ta tiếp tục viết lách như sau:
# task.delay(arg1, arg2, kwarg1='x', kwarg2='y') add.delay(10, 5) add.delay(a=10, b=5)
- Dùng
apply_async
thì nên viết lách phức tạp rộng lớn một chút
# task.apply_async(args=[arg1, arg2], kwargs={'kwarg1': 'x', 'kwarg2': 'y'}) add.apply_async(queue='low_priority', args=(10, 5)) add.apply_async(queue='high_priority', kwargs={'a': 10, 'b': 5})
-
Về thực chất
delay
vàapply_async
là như nhau tuy nhiêndelay
tiếp tục có trước những thiết lập khoác toan và tất cả chúng ta chỉ rất có thể truyền nhập những thông số đề nghị tiếp tục khái niệm nhập function của task, còn vớiapply_async
tất cả chúng ta rất có thể truyền tăng những thông số khác ví như queue tất cả chúng ta mong muốn gửi message nhập,.... Best practice là nên dùngapply_async
nhằm tiện việc config chạy task tùy từng yêu cầu dùng. -
Celery tương hỗ việc gọi task theo hình thức chaining, thành quả của task này rất có thể được truyền nhập task tiếp theo
add.apply_async((2, 2), link=add.s(16)) # 20
- Nhờ nhập hình thức này tất cả chúng ta rất có thể kiến thiết callback cho tới task như sau
@app.task def error_handler(uuid): result = AsyncResult(uuid) exc = result.get(propagate=False) print('Task {0} raised exception: {1!r}\n{2!r}'.format( uuid, exc, result.traceback))
add.apply_async((2, 2), link_error=error_handler.s())
Sử dụng Celery
Cài đặt
pip install -U Celery
Sử dụng
-
Lựa lựa chọn loại message broker phù phù hợp với dự án công trình. Như tiếp tục trình bày phía trên Celery tương hỗ 3 loại message broker là RabbitMQ, Redis, SQS. Mình tiếp tục cút sâu sắc nhập phân tách từng loại message broker nhập phần sau về Celery.
-
Tạo một celery worker với task
add
from celery Import Celery app = Celery('name of module', broker='url_of_broker') @app.task def add(x, y): return x + y
-
Chạy worker
$ celery -A tasks worker --loglevel=info
-
Gọi task
>>> from tasks import add >>> add.delay(4, 4)
Lưu kết quả
-
Celery rất có thể ghi lại tình trạng của tasks nếu như tất cả chúng ta cần thiết theo đòi dõi tasks trong tương lai. Với những khối hệ thống tiến hành task theo đòi cách thức state machine thì việc khối hệ thống cần thiết bắt được luồng tình trạng của task là vô nằm trong cần thiết.
-
Các khối hệ thống celery dùng làm lưu tình trạng task:
- SQLAlchemy
- Memcached
- Redis
-
Để dùng hình thức lưu thành quả nhập Celery tất cả chúng ta khai báo celery worker với thông số
backend
. Tại trên đây bản thân dùng redis cho tất cả việc lưu thành quả task lộn thực hiện message brokerapp = Celery('tasks', backend='redis://localhost', broker='redis://localhost:6379/0')
Cấu hình Celery
-
Cấu hình khoác toan cơ bạn dạng của celery:
## Broker settings. broker_url = 'redis://localhost:6379/0' # List of modules vĩ đại import when the Celery worker starts. imports = ('myapp.tasks',) ## Using the database vĩ đại store task state and results. result_backend = 'db+sqlite:///results.db' task_annotations = {'tasks.add': {'rate_limit': '10/s'}}
-
Best practice: tạo nên một tệp tin config riêng rẽ cho tới celery
celeryconfig.py
broker_url = 'redis://localhost:6379/0://' result_backend = 'rpc://' task_serializer = 'json' result_serializer = 'json' accept_content = ['json'] timezone = 'Europe/Oslo' enable_utc = True task_routes = { 'tasks.add': 'low-priority', } # routing một task cho tới queue mong chờ muốn
-
Ngoài cơ hội tạo nên tệp tin config bên trên rời khỏi tất cả chúng ta cũng rất có thể config thẳng vì như thế application của Celery
app.conf
app.conf.update(enable_utc=True, timezone='Europe/London',)
Tổng kết
-
Celery không cần thiết phải config nhiều tuy nhiên chỉ việc import kể từ module dùng thẳng như sau
from celery Import Celery app = Celery('name of module', broker='url_of_broker')
-
Worker và client của Celery rất có thể tự động retry
Xem thêm: kick là gì
-
Một process của Celery rất có thể xử lý mặt hàng triệu task nhập một phút với chừng trễ chỉ vài ba miligiây
-
Celery hỗ trợ:
- Message brokers:
- RabbitMQ
- Redis
- SQS
- Xử lý concurrency
- multiprocessing
- multithread
- single thread
- eventlet, gevent
- Lưu trữ thành quả bên trên những hệ thống:
- Amqp
- Redis
- Memcached
- SQLAlchemy
- Amazon S3
- File system
- Serialization
- json
- yaml
Ở phần sau nội dung bài viết bản thân tiếp tục cút sâu sắc rộng lớn về worker nhập Celery và nhì loại message broker tuy nhiên Celery hỗ trợ: SQS - Redis, mặt khác dựng một phần mềm cơ bạn dạng dùng khối hệ thống này.
- Message brokers:
Tham khảo
- https://docs.celeryproject.org/en/latest/
- https://lakemagadiadventures.com/p/gioi-thieu-celery-maGK7mvBlj2
- https://pawelzny.com/python/celery/2017/08/14/celery-4-tasks-best-practices/
Bình luận