不积跬步,无以至千里;不积小流,无以成江海。

Dean's blog

  • Join Us on Facebook!
  • Follow Us on Twitter!
  • LinkedIn
  • Subcribe to Our RSS Feed

使用 Scrapy Pipeline将数据保存到JSON文件

在上一篇使用 Scrapy 爬取博客园列表中,实现了爬取首页博文列表,并将数据保存到JSON文件中。在示例中,数据的保存放在了提取函数里,这样提取函数同时需要提取数据与文件保存,职责不够单一。针对这个问题Scrapy提供了Pipeline管道技术来实现。

管道示例

使用管道需要通过以下几步来实现:

第一步、在items.py文件中定义Item:

class CnblogsItem(scrapy.Item):
    diggnum     = scrapy.Field()    #推荐数
    title       = scrapy.Field()    #标题
    titlelnk    = scrapy.Field()    #标题链接
    summary     = scrapy.Field()    #摘要
    fack        = scrapy.Field()    #作者头像
    author      = scrapy.Field()    #作者名称
    authorlnk   = scrapy.Field()    #作者主页
    time        = scrapy.Field()    #发表时间
    comment     = scrapy.Field()    #评论数
    view        = scrapy.Field()    #阅读数

第二步、在pipelines.py文件中定义管道:

class CnblogsPipeline(object):
    def spider_open(self, spider):
        '''爬虫打开后'''
        pass

    def spider_closed(self, spider):
        '''爬虫关闭后'''
        pass

    def process_item(self, item, spider):
        '''item处理函数'''
        return item

这是piplines管道的三个重要的方法,其中process_item函数是必须实现的。但是测试发现,在Windows版本Scrapy  1.6.0中,spider_open()和spider_closed()两个方法并不有被调用。如果想实现类似的功能,可以使用构造函数和析构函数来处理,例如:

import json

class CnblogsPipeline(object):
    def __init__(self):
        '''构造函数'''
        self.file = open('pipeline.txt', 'at', encoding='UTF-8')

    def __del__(self):
        '''析构函数'''
        self.file.close()

    def process_item(self, item, spider):
        '''item处理函数'''
        j = json.dumps(item, ensure_ascii=False)
        #如果item是scrapy.Item子类,则需要:
        #j = json.dumps(dict(item), ensure_ascii=False)
        self.file.write(j + "\n")
        return item

这样,同样也可以实现,在爬虫开始时,先通过 __init__ 初始化管道,而在爬行结束后,通过  __del__ 函数释放管道资源。

第三步、在settings.py中启用管道

ITEM_PIPELINES = {
    'cnblogs.pipelines.CnblogsPipeline': 300,
}

后面的数字决定管道的执行顺序,多个管道时会从小到大的顺序执行。

第四步、在callback中返回dict或scrapy.Item子类

    def parse_page(self, response):
        posts = response.css(".post_item")
        for post in posts:
            item = {
                'diggnum' : post.css(".diggnum::text").re_first(r'\d+'),                        #推荐数
                'title' : post.css(".titlelnk::text").extract_first(),                          #标题
                'titlelnk' : post.css(".titlelnk::attr(href)").extract_first(),                 #标题链接
                'summary' : "".join(post.css(".post_item_summary::text").extract()),            #摘要
                'fack' : post.css(".pfs::attr(src)").extract_first(),                           #作者头像
                'author' : post.css(".lightblue::text").extract_first(),                        #作者名称
                'authorlnk' : post.css(".lightblue::attr(href)").extract_first(),               #作者主页
                'time' : post.css(".post_item_foot::text").re_first(r'\d+-\d+-\d+ \d+:\d+'),    #发表时间
                'comment' : post.css(".article_comment .gray::text").re_first(r'\d+'),          #评论数
                'view' : post.css(".article_comment .gray::text").re_first(r'\d+')              #阅读数
            }
            item = self.strip(item)
            yield item

这样parse_page()函数成为一个生成器,如果返回的是dict或scrapy.Item子类实例时,就会传递到管道处理。如果使用的是scrapy.Item子类,可以类似这样设置:

def parse_page(self, response):
    posts = response.css(".post_item")
    for post in posts:
        item = CnblogsItem()
        item['diggnum'] = post.css(".diggnum::text").re_first(r'\d+')                        #推荐数
        item['title'] = post.css(".titlelnk::text").extract_first()                          #标题
        item['titlelnk'] = post.css(".titlelnk::attr(href)").extract_first()                 #标题链接
        item['summary'] = "".join(post.css(".post_item_summary::text").extract())            #摘要
        item['fack'] = post.css(".pfs::attr(src)").extract_first()                           #作者头像
        item['author'] = post.css(".lightblue::text").extract_first()                        #作者名称
        item['authorlnk'] = post.css(".lightblue::attr(href)").extract_first()               #作者主页
        item['time'] = post.css(".post_item_foot::text").re_first(r'\d+-\d+-\d+ \d+:\d+')    #发表时间
        item['comment'] = post.css(".article_comment .gray::text").re_first(r'\d+')          #评论数
        item['view'] = post.css(".article_comment .gray::text").re_first(r'\d+')             #阅读数

        self.strip(item)

        yield item

 

多个管道的协调

1、执行顺序

在settings中启用管道时指定执行顺序,数值小的先执行:

ITEM_PIPELINES = {
    'cnblogs.pipelines.CnblogsPipelineL0': 100,
    'cnblogs.pipelines.CnblogsPipelineL1': 200,
}

这样在处理数据时,CnblogsPipelineL0优先于CnblogsPipelineL1。

2、决定是否让其它管道处理

如果当前管道已处理完毕,不需要其它管道处理时,可以在 process_item(self, item, spider) 函数中raise DropItem异常,例如:

def process_item(self, item, spider):
    '''item处理函数'''
    if(int(item['diggnum']) > 0):
        j = json.dumps(item, ensure_ascii=False)
        self.file.write(j + "\n")
        raise DropItem
    else:
        return item

这样,当int(item['diggnum']) > 0时,将抛出DropItem异常,将不会再传递给其它管道处理。

不允许评论
粤ICP备17049187号-1