在上一篇使用 Scrapy 爬取博客园列表中,实现了爬取首页博文列表,并将数据保存到JSON文件中。在示例中,数据的保存放在了提取函数里,这样提取函数同时需要提取数据与文件保存,职责不够单一。针对这个问题Scrapy提供了Pipeline管道技术来实现。
管道示例
使用管道需要通过以下几步来实现:
第一步、在items.py文件中定义Item:
class CnblogsItem(scrapy.Item):
diggnum = scrapy.Field() #推荐数
title = scrapy.Field() #标题
titlelnk = scrapy.Field() #标题链接
summary = scrapy.Field() #摘要
fack = scrapy.Field() #作者头像
author = scrapy.Field() #作者名称
authorlnk = scrapy.Field() #作者主页
time = scrapy.Field() #发表时间
comment = scrapy.Field() #评论数
view = scrapy.Field() #阅读数
第二步、在pipelines.py文件中定义管道:
class CnblogsPipeline(object):
def spider_open(self, spider):
'''爬虫打开后'''
pass
def spider_closed(self, spider):
'''爬虫关闭后'''
pass
def process_item(self, item, spider):
'''item处理函数'''
return item
这是piplines管道的三个重要的方法,其中process_item函数是必须实现的。但是测试发现,在Windows版本Scrapy 1.6.0中,spider_open()和spider_closed()两个方法并不有被调用。如果想实现类似的功能,可以使用构造函数和析构函数来处理,例如:
import json
class CnblogsPipeline(object):
def __init__(self):
'''构造函数'''
self.file = open('pipeline.txt', 'at', encoding='UTF-8')
def __del__(self):
'''析构函数'''
self.file.close()
def process_item(self, item, spider):
'''item处理函数'''
j = json.dumps(item, ensure_ascii=False)
#如果item是scrapy.Item子类,则需要:
#j = json.dumps(dict(item), ensure_ascii=False)
self.file.write(j + "\n")
return item
这样,同样也可以实现,在爬虫开始时,先通过 __init__ 初始化管道,而在爬行结束后,通过 __del__ 函数释放管道资源。
第三步、在settings.py中启用管道
ITEM_PIPELINES = {
'cnblogs.pipelines.CnblogsPipeline': 300,
}
后面的数字决定管道的执行顺序,多个管道时会从小到大的顺序执行。
第四步、在callback中返回dict或scrapy.Item子类
def parse_page(self, response):
posts = response.css(".post_item")
for post in posts:
item = {
'diggnum' : post.css(".diggnum::text").re_first(r'\d+'), #推荐数
'title' : post.css(".titlelnk::text").extract_first(), #标题
'titlelnk' : post.css(".titlelnk::attr(href)").extract_first(), #标题链接
'summary' : "".join(post.css(".post_item_summary::text").extract()), #摘要
'fack' : post.css(".pfs::attr(src)").extract_first(), #作者头像
'author' : post.css(".lightblue::text").extract_first(), #作者名称
'authorlnk' : post.css(".lightblue::attr(href)").extract_first(), #作者主页
'time' : post.css(".post_item_foot::text").re_first(r'\d+-\d+-\d+ \d+:\d+'), #发表时间
'comment' : post.css(".article_comment .gray::text").re_first(r'\d+'), #评论数
'view' : post.css(".article_comment .gray::text").re_first(r'\d+') #阅读数
}
item = self.strip(item)
yield item
这样parse_page()函数成为一个生成器,如果返回的是dict或scrapy.Item子类实例时,就会传递到管道处理。如果使用的是scrapy.Item子类,可以类似这样设置:
def parse_page(self, response):
posts = response.css(".post_item")
for post in posts:
item = CnblogsItem()
item['diggnum'] = post.css(".diggnum::text").re_first(r'\d+') #推荐数
item['title'] = post.css(".titlelnk::text").extract_first() #标题
item['titlelnk'] = post.css(".titlelnk::attr(href)").extract_first() #标题链接
item['summary'] = "".join(post.css(".post_item_summary::text").extract()) #摘要
item['fack'] = post.css(".pfs::attr(src)").extract_first() #作者头像
item['author'] = post.css(".lightblue::text").extract_first() #作者名称
item['authorlnk'] = post.css(".lightblue::attr(href)").extract_first() #作者主页
item['time'] = post.css(".post_item_foot::text").re_first(r'\d+-\d+-\d+ \d+:\d+') #发表时间
item['comment'] = post.css(".article_comment .gray::text").re_first(r'\d+') #评论数
item['view'] = post.css(".article_comment .gray::text").re_first(r'\d+') #阅读数
self.strip(item)
yield item
多个管道的协调
1、执行顺序
在settings中启用管道时指定执行顺序,数值小的先执行:
ITEM_PIPELINES = {
'cnblogs.pipelines.CnblogsPipelineL0': 100,
'cnblogs.pipelines.CnblogsPipelineL1': 200,
}
这样在处理数据时,CnblogsPipelineL0优先于CnblogsPipelineL1。
2、决定是否让其它管道处理
如果当前管道已处理完毕,不需要其它管道处理时,可以在 process_item(self, item, spider) 函数中raise DropItem异常,例如:
def process_item(self, item, spider):
'''item处理函数'''
if(int(item['diggnum']) > 0):
j = json.dumps(item, ensure_ascii=False)
self.file.write(j + "\n")
raise DropItem
else:
return item
这样,当int(item['diggnum']) > 0时,将抛出DropItem异常,将不会再传递给其它管道处理。