热门标签 | HotTags
当前位置:  开发笔记 > 编程语言 > 正文

celery爬虫使用

简介celery是一个基于分布式消息传输的异步任务队列,它专注于实时处理,同时也支持任务调度。它由三部分组成,消息中间件,

简介

celery是一个基于分布式消息传输的异步任务队列,它专注于实时处理,同时也支持任务调度。

它由三部分组成,消息中间件,任务执行单元任务执行结果存储组成。

官网 :http://www.celeryproject.org/                 下载:pip install celery

消息中间件:Celery本身不提供消息服务,但是可以方便的和第三方提供的消息中间件集成。包括,RabbitMQ, Redis等等。
任务执行单元:Worker是Celery提供的任务执行的单元,worker并发的运行在分布式的系统节点中。
任务结果存储:Task result store用来存储Worker执行的任务的结果,Celery支持以不同方式存储任务的结果,包括AMQP, redis等。Celery多用来执行异步任务,将耗时的操作交由Celery去异步执行,比如发送邮件、短信、消息推送、音视频处理等。还可以执行定时任务,定时执行某件事情,比如Redis中的数据每天凌晨两点保存至mysql数据库,实现Redis的持久化。使用 Celery 之前请务必理解以下概念:
a. Celery Beat: 任务调度器,Beat 进程会读取配置文件的内容,周期性的将配置中到期需要执行的任务发送给任务队列
b. Celery Worker: 执行任务的消费者,通常会在多台服务器运行多个消费者来提高运行效率。
c. Broker: 消息代理,也是任务队列本身(通常是消息队列或者数据库),通常称为消息中间件,接收任务生产者发送过来的任务消息,存进队列再按序分发给任务消费方。
d. Producer: 任务生产者,调用 Celery API 的函数或者装饰器而产生任务并交给任务队列处理的都是任务生产者。

项目结构如下:

其中,app_test.py为主程序,其代码如下:

from celery import Celeryapp = Celery('proj', include=['proj.tasks'])
app.config_from_object('proj.celeryconfig')if __name__ == '__main__':app.start()

tasks.py为任务函数,代码如下:

import re
import requests
from celery import group
from proj.app_test import app@app.task(trail=True)
# 并行调用任务
def get_content(urls):return group(C.s(url) for url in urls)()@app.task(trail=True)
def C(url):return parser.delay(url)@app.task(trail=True)
# 获取每个网页的name和description
def parser(url):req = requests.get(url)html = req.texttry:name = re.findall(r'(.+?)', html)[0]desc = re.findall(r'(.+?)', html)[0]if name is not None and desc is not None:return name, descexcept Exception as err:return '', ''

celeryconfig.py为celery的配置文件,代码如下:

BROKER_URL = 'redis://localhost' # 使用Redis作为消息代理CELERY_RESULT_BACKEND = 'redis://localhost:6379/0' # 把任务结果存在了RedisCELERY_TASK_SERIALIZER = 'msgpack' # 任务序列化和反序列化使用msgpack方案CELERY_RESULT_SERIALIZER = 'json' # 读取任务结果一般性能要求不高,所以使用了可读性更好的JSONCELERY_TASK_RESULT_EXPIRES = 60 * 60 * 24 # 任务过期时间CELERY_ACCEPT_CONTENT = ['json', 'msgpack'] # 指定接受的内容类型

最后是我们的爬虫文件,scrapy.py,代码如下:

import time
import requests
from bs4 import BeautifulSoup
from proj.tasks import get_contentt1 = time.time()
url = "http://www.wikidata.org/w/index.php?title=Special:WhatLinksHere/Q5&limit=500&from=0"headers = {'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; WOW64) AppleWebKit/537.36 (KHTML, \like Gecko) Chrome/67.0.3396.87 Safari/537.36'}req = requests.get(url, headers=headers)
soup = BeautifulSoup(req.text, "lxml")
human_list = soup.find(id='mw-whatlinkshere-list')('li')urls = []
for human in human_list:url = human.find('a')['href']urls.append('https://www.wikidata.org'+url)
#print(urls)# 调用get_content函数,并获取爬虫结果
result = get_content.delay(urls)
res = [v for v in result.collect()]for r in res:if isinstance(r[1], list) and isinstance(r[1][0], str):print(r[1])t2 = time.time() # 结束时间
print('耗时:%s' % (t2 - t1))

在后台启动redis,并切换至proj项目所在目录,运行命令:

celery -A proj.app_test worker -l info

输出结果如下(只显示最后几行的输出):

......
['Antoine de Saint-Exupery', 'French writer and aviator']
['', '']
['Sir John Barrow, 1st Baronet', 'English statesman']
['Amy Johnson', 'pioneering English aviator']
['Mike Oldfield', 'English musician, multi-instrumentalist']
['Willoughby Newton', 'politician from Virginia, USA']
['Mack Wilberg', 'American conductor']
耗时:80.05160284042358

在rdm中查看数据,如下:

 

Celery进阶—分布式爬虫

新建文件crawlertask.py,用于执行数据抓取任务,代码如下。

#coding:utf-8import requests
from bs4 import BeautifulSoup
from celery import Celery,platformsapp = Celery('tasks',broker='redis://localhost:6379/0')
app.conf.CELERY_RESULT_BACKEND='redis://localhost:6379/0'platforms.C_FORCE_ROOT=Truedef format_str(str):return str.replace("\n","").replace(" ","").replace("\t","")@app.task
def get_urls_in_pages(from_page_num,to_page_num):urls=[]search_word='计算机'url_part_1='http://www.phei.com.cn/module/goods/'\'searchkey.jsp?Page='url_part_2='&Page=2&searchKey='for i in range(from_page_num,to_page_num+1):urls.append(url_part_1+str(i)+url_part_2+search_word)all_href_list=[]for url in urls:resp=requests.get(url)bs=BeautifulSoup(resp.text)a_list=bs.find_all('a')needed_list=[]for a in a_list:if 'href' in a.attrs:href_val=a['href']title=a.textif 'bookid' in href_val and 'shopcar0.jsp' not in href_val and title!='':if [title,href_val] not in needed_list:needed_list.append([format_str(title),format_str(href_val)])all_href_list+=needed_listall_href_file = open(str(from_page_num)+'_'+str(to_page_num)+'_'+'all_hrefs.txt','w')for href in all_href_list:all_href_file.write('\t'.join(href)+'\n')all_href_file.close()return len(all_href_list)

【部署服务器方法】

将以上脚本部署到两台云端服务器 , 并且在云端开启redis服务,然后执行:

celery worker -A crawlertask -l info -c 10

【部署从机方法】

将上面的脚本部署到两台主机A和B,然后各自运行下面的命令:

celery -A crawl_douban worker -l info

在本机新建文件task_dist.py用于异步分发任务,代码如下:

from celery import Celery
from threading import Thread
import timeredis_ips={0:'redis://101.200.163.195:6379/0',1:'redis://112.124.28.41:6379/0',2:'redis://112.124.28.41:6379/0',3:'redis://101.200.163.195:6379/0',
}def send_task_and_get_results(ind,from_page,to_page):app=Celery('crawlertask',broker=redis_ips[ind])app.conf.CELERY_RESULT_BACKEND=redis_ips[ind]result=app.send_task('crawlertask.get_urls_in_pages',args=(from_page,to_page))print(redis_ips[ind],result.get())if __name__=='__main__':t1=time.time()page_ranges_lst=[(1,10),(11,20),(21,30),(31,40),]th_lst = []for ind, page_range in enumerate(page_ranges_lst):th = Thread(target=send_task_and_get_results,args=(ind,page_range[0], page_range[1]))th_lst.append(th)for th in th_lst:th.start()for th in th_lst:th.join()t2 = time.time()print("用时:", t2 - t1)

举例:

以爬douban小说为例 首先启动Redis,新建文件crawl_douban.py

import requests
import time
import redis
from celery import Celery
from bs4 import BeautifulSoup
from configparser import ConfigParsercp=ConfigParser()
cp.read('config')#获取配置信息
db_host=cp.get(section='redis',option='db_host')
db_port=cp.getint('redis','db_port')
db_pwd=cp'redis'#redis连接
pool = redis.ConnectionPool(host=db_host, port=db_port, db=15, password=db_pwd)
r = redis.StrictRedis(connection_pool=pool)
set_name='crawl:douban'app = Celery('crawl', include=['task'], broker='redis://:{}@{}:{}/12'.format(db_pwd,db_host,db_port), backend='redis://:{}@{}:{}/13'.format(db_pwd,db_host,db_port))#官方推荐使用json作为消息序列化方式
app.conf.update(CELERY_TIMEZONE='Asia/Shanghai',CELERY_ENABLE_UTC=True,CELERY_ACCEPT_CONTENT=['json'],CELERY_TASK_SERIALIZER='json',CELERY_RESULT_SERIALIZER='json',
)headers={'User-Agent':'',}@app.task
def crawl(url):res=requests.get(url,headers=headers)#延迟2秒time.sleep(2)soup=BeautifulSoup(res.text,'lxml')items=soup.select('.subject-list .subject-item .info h2 a')titles=[item['title'] for item in items]#将小说的title存入redis数据库r.sadd(set_name,(url,titles,time.time()))print(titles)return (url,titles)

将上面的脚本部署到两台主机A和B,然后各自运行下面的命令:

celery -A crawl_douban worker -l info

在本机C新建文件task_dispatcher.py用于异步分发任务,代码如下:

from crawl_douban import app
from crawl_douban import crawldef manage_crawl(urls):for url in urls:app.send_task('crawl_douban.crawl', args=(url,))#上句也可以写成 crawl.apply_async(args=(url,)) 或 crawl.delay(url)if __name__ == '__main__':start_url = 'https://book.douban.com/tag/小说'#爬去10页,每页20本书url_list = ['{}?start={}&type=T'.format(start_url, page * 20) for page in range(10)]manage_crawl(url_list)

运行task_dispatcher.py,跑完用时2.8s

 

celery worker -A tasks --loglevel=info --concurrency=5# 参数”-A”指定了Celery实例的位置
# 参数”loglevel”指定了日志等级,也可以不加,默认为warning。
# 参数”concurrency”指定最大并发数,默认为CPU核数。

 


推荐阅读
  • 一句话解决高并发的核心原则
    本文介绍了解决高并发的核心原则,即将用户访问请求尽量往前推,避免访问CDN、静态服务器、动态服务器、数据库和存储,从而实现高性能、高并发、高可扩展的网站架构。同时提到了Google的成功案例,以及适用于千万级别PV站和亿级PV网站的架构层次。 ... [详细]
  • 关于我们EMQ是一家全球领先的开源物联网基础设施软件供应商,服务新产业周期的IoT&5G、边缘计算与云计算市场,交付全球领先的开源物联网消息服务器和流处理数据 ... [详细]
  • Oracle优化新常态的五大禁止及其性能隐患
    本文介绍了Oracle优化新常态中的五大禁止措施,包括禁止外键、禁止视图、禁止触发器、禁止存储过程和禁止JOB,并分析了这些禁止措施可能带来的性能隐患。文章还讨论了这些禁止措施在C/S架构和B/S架构中的不同应用情况,并提出了解决方案。 ... [详细]
  • 分享css中提升优先级属性!important的用法总结
    web前端|css教程css!importantweb前端-css教程本文分享css中提升优先级属性!important的用法总结微信门店展示源码,vscode如何管理站点,ubu ... [详细]
  • 本文介绍了如何使用php限制数据库插入的条数并显示每次插入数据库之间的数据数目,以及避免重复提交的方法。同时还介绍了如何限制某一个数据库用户的并发连接数,以及设置数据库的连接数和连接超时时间的方法。最后提供了一些关于浏览器在线用户数和数据库连接数量比例的参考值。 ... [详细]
  • Tomcat/Jetty为何选择扩展线程池而不是使用JDK原生线程池?
    本文探讨了Tomcat和Jetty选择扩展线程池而不是使用JDK原生线程池的原因。通过比较IO密集型任务和CPU密集型任务的特点,解释了为何Tomcat和Jetty需要扩展线程池来提高并发度和任务处理速度。同时,介绍了JDK原生线程池的工作流程。 ... [详细]
  • 本文介绍了Redis中RDB文件和AOF文件的保存和还原机制。RDB文件用于保存和还原Redis服务器所有数据库中的键值对数据,SAVE命令和BGSAVE命令分别用于阻塞服务器和由子进程执行保存操作。同时执行SAVE命令和BGSAVE命令,以及同时执行两个BGSAVE命令都会产生竞争条件。服务器会保存所有用save选项设置的保存条件,当满足任意一个保存条件时,服务器会自动执行BGSAVE命令。此外,还介绍了RDB文件和AOF文件在操作方面的冲突以及同时执行大量磁盘写入操作的不良影响。 ... [详细]
  • 本文介绍了操作系统的定义和功能,包括操作系统的本质、用户界面以及系统调用的分类。同时还介绍了进程和线程的区别,包括进程和线程的定义和作用。 ... [详细]
  • 如何使用代理服务器进行网页抓取?
    本文介绍了如何使用代理服务器进行网页抓取,并探讨了数据驱动对竞争优势的重要性。通过网页抓取,企业可以快速获取并分析大量与需求相关的数据,从而制定营销战略。同时,网页抓取还可以帮助电子商务公司在竞争对手的网站上下载数百页的有用数据,提高销售增长和毛利率。 ... [详细]
  • 篇首语:本文由编程笔记#小编为大家整理,主要介绍了软件测试知识点之数据库压力测试方法小结相关的知识,希望对你有一定的参考价值。 ... [详细]
  • 上图是InnoDB存储引擎的结构。1、缓冲池InnoDB存储引擎是基于磁盘存储的,并将其中的记录按照页的方式进行管理。因此可以看作是基于磁盘的数据库系统。在数据库系统中,由于CPU速度 ... [详细]
  • Centos下安装memcached+memcached教程
    本文介绍了在Centos下安装memcached和使用memcached的教程,详细解释了memcached的工作原理,包括缓存数据和对象、减少数据库读取次数、提高网站速度等。同时,还对memcached的快速和高效率进行了解释,与传统的文件型数据库相比,memcached作为一个内存型数据库,具有更高的读取速度。 ... [详细]
  • MySQL数据库锁机制及其应用(数据库锁的概念)
    本文介绍了MySQL数据库锁机制及其应用。数据库锁是计算机协调多个进程或线程并发访问某一资源的机制,在数据库中,数据是一种供许多用户共享的资源,如何保证数据并发访问的一致性和有效性是数据库必须解决的问题。MySQL的锁机制相对简单,不同的存储引擎支持不同的锁机制,主要包括表级锁、行级锁和页面锁。本文详细介绍了MySQL表级锁的锁模式和特点,以及行级锁和页面锁的特点和应用场景。同时还讨论了锁冲突对数据库并发访问性能的影响。 ... [详细]
  • 深入理解Java虚拟机的并发编程与性能优化
    本文主要介绍了Java内存模型与线程的相关概念,探讨了并发编程在服务端应用中的重要性。同时,介绍了Java语言和虚拟机提供的工具,帮助开发人员处理并发方面的问题,提高程序的并发能力和性能优化。文章指出,充分利用计算机处理器的能力和协调线程之间的并发操作是提高服务端程序性能的关键。 ... [详细]
  • linux进阶50——无锁CAS
    1.概念比较并交换(compareandswap,CAS),是原⼦操作的⼀种,可⽤于在多线程编程中实现不被打断的数据交换操作࿰ ... [详细]
author-avatar
易_拉罐
这个家伙很懒,什么也没留下!
PHP1.CN | 中国最专业的PHP中文社区 | DevBox开发工具箱 | json解析格式化 |PHP资讯 | PHP教程 | 数据库技术 | 服务器技术 | 前端开发技术 | PHP框架 | 开发工具 | 在线工具
Copyright © 1998 - 2020 PHP1.CN. All Rights Reserved | 京公网安备 11010802041100号 | 京ICP备19059560号-4 | PHP1.CN 第一PHP社区 版权所有