scrapy作为一个专业的爬虫框架,不仅有完备的爬取工具,同样有完备的数据处理工作流。下面我们通过item和pipeline来介绍scrapy推荐的数据处理流程。
Item数据对象
英语字典中对item的解释是列表或群组中的一项。放在这里就是一条数据记录的对象。比如我们要抓取一个名单,那么item就是一个人。如果是商品列表,那么item就是一个商品。
item的主要作用是规范数据结构,简单粗暴的理解就是去掉多余的字段或补充缺失的字段,让一条数据记录保证一个一致的数据结构。它非常便于接下来存储到行列表的数据结构中(RDB或Excel)。下面定义了一个商品的item:
class ProductItem(scrapy.Item): name = scrapy.Field() price = scrapy.Field() stock = scrapy.Field() tags = scrapy.Field() last_updated = scrapy.Field(serializer=str)
我们可以看到定义过程非常简单,赋值的左侧直接写出我们要保留的字段即可。赋值的右边是清一色的scrapy.Field()即可,只有特殊情况需要定义一下序列化方法。
它的用法同样非常简单,假设我们在API中拿到的一条商品数据包含10个字段,但是我们只需要上面的5个字段,我们无须特殊的处理,直接把这条10个字段的数据传入构造函数即可。item会自动根据对应字段进行填充。
apidata = { "name":"《scrapy教程》", "price":"99.99", "stock":"1", "cover":"https://img.scrapy.xxx/xxxxxxx.jpg", "summary":"这是一本scrapy教程……", "url_buy":"https://shop.scrapy.xxx/xxx.html", "seller":"西门老铁", "created":"2020-02-02", "last_updated":"2021-12-02", "status":"0" } ProductItem(apidata)
看上例的最后一行,只需要把所有的数据传入ProductItem的构造函数即可完成字段的选择和赋值。并且只保留class中定义的5个字段。
Pipeline流水线
流水线应该不难理解,就是用来处理爬虫获得的数据的,所有yield输出的数据都会进入流水线,而且一个爬虫可以定义若干个流水线,比如我抓到一组商品信息,我可以用一个流水线来做一层简单的数据处理(比如去重),然后再把他写入MySQL,同时输出一份Excel报表。我就可以定义三个流水线,①数据处理、②写入MySQL、③写Excel。
流水线可以定义优先级表示执行顺序,但是请注意,这里的顺序是单个item的顺序,而不是一个流水线处理完所有item再启动另一个流水线。
一个pipeline的定义如下
class PricePipeline: vat_factor = 1.15 def process_item(self, item, spider): adapter = ItemAdapter(item) if adapter.get('price'): if adapter.get('price_excludes_vat'): adapter['price'] = adapter['price'] * self.vat_factor return item else: raise DropItem(f"Missing price in {item}")
一个pipeline的类里面最关键的方法便是process_item,用来处理每一个item。除此之外,还有open_spider和close_spider。注意这里open和close的后面是spider,也就是说在爬虫启动和关闭的时候调用的,而不是数据到来的时候。换句话说,在爬虫启动还没有开始抓数据的时候流水线就启动了。这里通常对应一些前置和收尾的动作,比如MySQL连接的open和close可以对应在这里执行,写入Excel文件的open和close。下面是一个MySQL的流水线示例
class WriteMysqlPipeline: def open_spider(self,spider): print('------pipline mysql writer------') self.conn = pymysql.connect(host=spider.db_host, user=spider.db_user, password=spider.db_pwd, database=spider.db_name, charset='utf8mb4') # 创建游标 self.cursor = self.conn.cursor() def process_item(self, item, spider): # sql语句 insert_sql = """ insert into ods_product(name,price,stock,tags,last_updated) VALUES(%s,%s,%s,%s,%s) """ # 执行插入数据到数据库操作 self.cursor.execute(insert_sql,(item['name'], item['price'], item['stock'],item['tags'],item['last_updated'])) return item def close_spider(self,spider): # 提交,不进行提交无法保存到数据库 self.conn.commit() # 关闭游标和连接 self.cursor.close() self.conn.close()
多item多pipeline的协作
很多时候,我们的一个爬虫会得到不止一种数据,这就意味着可能会yield多种item,我们无法指定哪个item只进哪个流水线,只要定义在ITEM_PIPELINES的设置项中的流水线,所有item一定会经过。所以我们需要在process_item中判断我们要处理的item再做处理。但是请注意,通常来说,如果流水线不是过滤器的话,process_item中的每一个分支,我们都要以return item退出,这样item才会被后面的流水线接收到。就像下面这样:
def process_item(self, item, spider): if not isinstance(item, ProductItem): return item #处理ProductItem ... return item
另外,我们的一个scrapy项目中往往也是有多个spider,每个sipder产生的item和对应的流水线都不同,但是settings.py中定义的ITEM_PIPELINES是全局生效的,这时候我们可以关闭settings.py中的配置,然后在对应的spider下面单独配置如下:
class ProductSpider(scrapy.Spider): name = 'product' custom_settings = { 'ITEM_PIPELINES': { 'shop.pipelines.ProductPipeline': 100, 'shop.pipelines.WriteMysqlPipeline': 200, }, }
如上配置就可以控制流水线只在ProductSpider生效,注意shop.pipelines.WriteMysqlPipeline是pipeline的完整类名,一般只需要替换最前面的shop为你的scrapy项目名称即可。