Welcome to OGeek Q&A Community for programmer and developer-Open, Learning and Share
Welcome To Ask or Share your Answers For Others

Categories

0 votes
490 views
in Technique[技术] by (71.8m points)

flask - Celery + SQS两次接收相同任务,同时具有相同任务ID(Celery + SQS is receiving same task twice, with same task id at same time)

using celery with SQS in flask app

(在烧瓶应用程序中将芹菜与SQS一起使用)
but celery is receiving same task twice with same task id at same time ,

(但芹菜同时收到两次具有相同任务ID的任务 ,)

running worker like this,

(这样的工人)
celery worker -A app.jobs.run -l info --pidfile=/var/run/celery/celery.pid --logfile=/var/log/celery/celery.log --time-limit=7200 --concurrency=8

here are the logs of celery

(这是芹菜的原木)

[2019-11-29 08:07:35,464: INFO/MainProcess] Received task: app.jobs.booking.bookFlightTask[657985d5-c3a3-438d-a524-dbb129529443]  
[2019-11-29 08:07:35,465: INFO/MainProcess] Received task: app.jobs.booking.bookFlightTask[657985d5-c3a3-438d-a524-dbb129529443]  
[2019-11-29 08:07:35,471: WARNING/ForkPoolWorker-4] in booking funtion1
[2019-11-29 08:07:35,473: WARNING/ForkPoolWorker-3] in booking funtion1
[2019-11-29 08:07:35,537: WARNING/ForkPoolWorker-3] book_request_pp
[2019-11-29 08:07:35,543: WARNING/ForkPoolWorker-4] book_request_pp

both are running simultaneously,

(两者同时运行)

using celery==4.4.0rc4 , boto3==1.9.232, kombu==4.6.6 with SQS in pyhton flask.

(使用pyhton烧瓶中的SQS使用celery == 4.4.0rc4,boto3 == 1.9.232,kombu == 4.6.6。)
In SQS, Default Visibility Timeout is 30 minutes, and my task is not having ETA and not ack

(在SQS中,默认可见性超时为30分钟,我的任务是没有ETA且没有确认)

my task.py

(我的task.py)

from app import app as flask_app
from app.jobs.run import capp
from flask_sqlalchemy import SQLAlchemy
db = SQLAlchemy(flask_app)

class BookingTasks:
    def addBookingToTask(self):
        request_data = request.json
        print ('in addBookingToTask',request_data['request_id'])
        print (request_data)
        bookFlightTask.delay(request_data)
        return 'addBookingToTask added'

@capp.task(max_retries=0)
def bookFlightTask(request_data):
    task_id = capp.current_task.request.id
    try:
        print ('in booking funtion1')
        ----

my config file, config.py

(我的配置文件config.py)

import os
from urllib.parse import quote_plus

aws_access_key = quote_plus(os.getenv('AWS_ACCESS_KEY'))
aws_secret_key = quote_plus(os.getenv('AWS_SECRET_KEY'))

broker_url = "sqs://{aws_access_key}:{aws_secret_key}@".format(
    aws_access_key=aws_access_key, aws_secret_key=aws_secret_key,
)
imports = ('app.jobs.run',)


## Using the database to store task state and results.
result_backend = 'db' + '+' + os.getenv('SQLALCHEMY_DATABASE_URI')

and lastly my celery app file, run.py

(最后是我的芹菜应用程序文件run.py)

from __future__ import absolute_import, unicode_literals
import os
from celery import Celery
from flask import Flask
from app import app as flask_app
import sqlalchemy
capp = Celery()

capp.config_from_object('app.jobs.config')

# Optional configuration, see the capplication user guide.
capp.conf.update(
    result_expires=3600,
)

# SQS_QUEUE_NAME is like 'celery_test.fifo' , .fifo is required
capp.conf.task_default_queue = os.getenv('FLIGHT_BOOKINNG_SQS_QUEUE_NAME')
if __name__ == '__main__':
    capp.start()
  ask by ssnitish translate from so

与恶龙缠斗过久,自身亦成为恶龙;凝视深渊过久,深渊将回以凝视…
Welcome To Ask or Share your Answers For Others

1 Reply

0 votes
by (71.8m points)
等待大神答复

与恶龙缠斗过久,自身亦成为恶龙;凝视深渊过久,深渊将回以凝视…
OGeek|极客中国-欢迎来到极客的世界,一个免费开放的程序员编程交流平台!开放,进步,分享!让技术改变生活,让极客改变未来! Welcome to OGeek Q&A Community for programmer and developer-Open, Learning and Share
Click Here to Ask a Question

...