如何从Java等非python语言调用芹菜任务延迟函数?

 爱得书签_902 发布于 2023-02-07 14:41

我已经在3台机器上安装了celery + rabbitmq.我还创建了一个任务,它根据文件中的数据生成正则表达式,并使用该信息来解析文本.

from celery import Celery

celery = Celery('tasks', broker='amqp://localhost//')
import re

@celery.task
def add(x, y):
     return x + y


def get_regular_expression():
    with open("text") as fp:
        data = fp.readlines()
    str_re = "|".join([x.split()[2] for x in data ])
    return str_re    



@celery.task
def analyse_json(tw):
    str_re = get_regular_expression()
    re.match(str_re,tw.text) 

我可以使用以下python代码轻松调用此任务: -

from tasks import analyse_tweet_json
x = tweet ## load from a file (x is a json)
analyse_tweet_json.delay(x) 

但是,现在我想从Java而不是python进行相同的调用.我不确定做同样事情的最简单方法是什么.

我已经编写了这段代码,用于向AMQP代理发送消息.代码运行正常,但任务没有执行.我不知道如何指定应该执行的任务的名称.

import com.rabbitmq.client.AMQP;
import com.rabbitmq.client.ConnectionFactory;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.Channel;

class try1 {
public static void main(String[] args) throws Exception {
    ConnectionFactory factory = new ConnectionFactory();
    factory.setUri("amqp://localhost");
    Connection connection = factory.newConnection();
    Channel channel = connection.createChannel();
    String queueName = channel.queueDeclare().getQueue();
    channel.queueBind(queueName, "celery", "celery");
    String messageBody = "{\"text\":\"i am good\"}" ;
    byte[] msgBytes = messageBody.getBytes("ASCII") ;
    channel.basicPublish(queueName, queueName,
            new AMQP.BasicProperties
            ("application/json", null, null, null,
                    null, null, null, null,
                    null, null, null, "guest",
                    null, null),messageBody.getBytes("ASCII")) ;
    connection.close();    

}}

这是rabbitMq的错误日志中的输出: -

connection <0.14627.0>, channel 1 - error:
{amqp_error,not_found,
"no exchange 'amq.gen-gEV47GX9pF_oZ-0bEnOazE' in vhost '/'",
'basic.publish'}

任何帮助将不胜感激.

谢谢,阿米特

1 个回答
  • 有几个问题.

    1)String queueName = channel.queueDeclare().getQueue()命令返回错误的队列名称.我将queuename更改为"芹菜",它运行良好.2)json的格式必须是这种类型: - {"id":"4cc7438e-afd4-4f8f-a2f3-f46567e7ca77","task":"celery.task.PingTask","args":[], "kwargs":{},"retries":0,"eta":"2009-11-17T12:30:56.527191"}

    如http://docs.celeryproject.org/en/latest/internals/protocol.html中所示

    在这两个变化之后,它运作良好.

    -Amit

    2023-02-07 14:44 回答
撰写答案
今天,你开发时遇到什么问题呢?
立即提问
热门标签
PHP1.CN | 中国最专业的PHP中文社区 | PNG素材下载 | DevBox开发工具箱 | json解析格式化 |PHP资讯 | PHP教程 | 数据库技术 | 服务器技术 | 前端开发技术 | PHP框架 | 开发工具 | 在线工具
Copyright © 1998 - 2020 PHP1.CN. All Rights Reserved 京公网安备 11010802041100号 | 京ICP备19059560号-4 | PHP1.CN 第一PHP社区 版权所有