Skip to content

Python 爬虫数据存储

用解析器解析出数据之后,接下来就是存储数据了。保存形式多种多样:

  • 文件存储:TXT、JSON、CSV
  • 关系型数据库:MySQL、SQLite、PostgreSQL
  • 非关系型数据库:MongoDB、Redis

一、文件存储

1.1 TXT 文本存储

TXT 文本几乎兼容任何平台,但不利于检索。适合对检索和数据结构要求不高的场景。

基本写入

python
import requests
from pyquery import PyQuery as pq

url = 'https://www.example.com'
headers = {'User-Agent': 'Mozilla/5.0 ...'}
html = requests.get(url, headers=headers).text
doc = pq(html)

# 写入文件
file = open('data.txt', 'a', encoding='utf-8')
file.write('内容')
file.write('\n' + '=' * 50 + '\n')
file.close()

推荐写法(with as)

python
# 自动关闭文件
with open('data.txt', 'a', encoding='utf-8') as file:
    file.write('内容\n')

文件打开模式

模式描述
r只读(默认),文件指针在开头
w写入,已存在则覆盖,不存在则创建
a追加,文件指针在末尾
rb/wb/ab二进制模式
r+/w+/a+读写模式

1.2 JSON 文件存储

JSON 是轻量级的数据交换格式,结构清晰,是数据交换的极佳方式。

读取 JSON

python
import json

# 从字符串解析
str_data = '''
[{
    "name": "Bob",
    "gender": "male",
    "birthday": "1992-10-18"
}]
'''
data = json.loads(str_data)  # 转为 Python 对象
print(data[0]['name'])  # Bob
print(data[0].get('age', 25))  # 25(默认值)

# 从文件读取
with open('data.json', 'r', encoding='utf-8') as file:
    data = json.loads(file.read())

⚠️ 注意:JSON 字符串必须使用双引号,单引号会导致解析错误。

写入 JSON

python
import json

data = [{'name': 'Bob', 'age': 20}]

with open('data.json', 'w', encoding='utf-8') as file:
    # 格式化输出,支持中文
    file.write(json.dumps(data, indent=2, ensure_ascii=False))

1.3 CSV 文件存储

CSV 以纯文本形式存储表格数据,比 Excel 更简洁。

写入 CSV

python
import csv

# 列表写入
with open('data.csv', 'w', newline='', encoding='utf-8') as csvfile:
    writer = csv.writer(csvfile)
    writer.writerow(['id', 'name', 'age'])  # 表头
    writer.writerow(['10001', 'Mike', 20])
    writer.writerows([['10002', 'Bob', 22], ['10003', 'Jordan', 21]])

# 字典写入
with open('data.csv', 'w', newline='', encoding='utf-8') as csvfile:
    fieldnames = ['id', 'name', 'age']
    writer = csv.DictWriter(csvfile, fieldnames=fieldnames)
    writer.writeheader()
    writer.writerow({'id': '10001', 'name': 'Mike', 'age': 20})

读取 CSV

python
import csv

with open('data.csv', 'r', encoding='utf-8') as csvfile:
    reader = csv.reader(csvfile)
    for row in reader:
        print(row)  # ['id', 'name', 'age']

# 使用 pandas(推荐)
import pandas as pd
df = pd.read_csv('data.csv')
print(df)

二、MySQL 存储

MySQL 是最常用的关系型数据库,Python 通过 PyMySQL 库操作。

2.1 连接数据库

python
import pymysql

db = pymysql.connect(
    host='localhost',
    user='root',
    password='123456',
    port=3306,
    db='spiders'
)
cursor = db.cursor()

2.2 创建表

python
sql = '''
CREATE TABLE IF NOT EXISTS students (
    id VARCHAR(255) NOT NULL,
    name VARCHAR(255) NOT NULL,
    age INT NOT NULL,
    PRIMARY KEY (id)
)
'''
cursor.execute(sql)
db.close()

2.3 插入数据

python
# 基础插入
sql = 'INSERT INTO students(id, name, age) VALUES (%s, %s, %s)'
try:
    cursor.execute(sql, ('20120001', 'Bob', 20))
    db.commit()  # 必须提交
except:
    db.rollback()  # 回滚

# 通用插入(字典动态构造)
data = {'id': '20120001', 'name': 'Bob', 'age': 20}
table = 'students'
keys = ', '.join(data.keys())
values = ', '.join(['%s'] * len(data))
sql = f'INSERT INTO {table}({keys}) VALUES ({values})'

try:
    cursor.execute(sql, tuple(data.values()))
    db.commit()
except:
    db.rollback()

2.4 更新数据(去重)

python
# 主键存在则更新,不存在则插入
sql = '''
INSERT INTO {table}({keys}) VALUES ({values})
ON DUPLICATE KEY UPDATE
'''.format(table=table, keys=keys, values=values)

update = ', '.join([f'{key} = %s' for key in data])
sql += update

try:
    cursor.execute(sql, tuple(data.values()) * 2)
    db.commit()
except:
    db.rollback()

2.5 查询数据

python
sql = 'SELECT * FROM students WHERE age >= 20'
try:
    cursor.execute(sql)
    print('Count:', cursor.rowcount)
    
    # 获取单条
    one = cursor.fetchone()
    
    # 获取全部(大数据量慎用)
    results = cursor.fetchall()
    
    # 推荐:逐条获取
    row = cursor.fetchone()
    while row:
        print(row)
        row = cursor.fetchone()
except:
    print('Error')

2.6 删除数据

python
sql = 'DELETE FROM students WHERE age > 20'
try:
    cursor.execute(sql)
    db.commit()
except:
    db.rollback()

三、MongoDB 存储

MongoDB 是基于分布式文件存储的非关系型数据库,内容存储形式类似 JSON,非常灵活。

3.1 连接与初始化

python
import pymongo

# 连接
client = pymongo.MongoClient(host='localhost', port=27017)
# 或使用连接字符串
# client = pymongo.MongoClient('mongodb://localhost:27017/')

# 指定数据库
db = client.test  # 或 client['test']

# 指定集合(类似表)
collection = db.students  # 或 db['students']

3.2 插入数据

python
student = {
    'id': '20170101',
    'name': 'Jordan',
    'age': 20,
    'gender': 'male'
}

# 插入单条(推荐)
result = collection.insert_one(student)
print(result.inserted_id)

# 插入多条
result = collection.insert_many([student1, student2])
print(result.inserted_ids)

3.3 查询数据

python
# 查询单条
result = collection.find_one({'name': 'Mike'})

# 根据 ObjectId 查询
from bson.objectid import ObjectId
result = collection.find_one({'_id': ObjectId('593278c115c2602667ec6bae')})

# 查询多条
results = collection.find({'age': 20})
for result in results:
    print(result)

# 条件查询
results = collection.find({'age': {'$gt': 20}})  # 大于 20
results = collection.find({'name': {'$regex': '^M.*'}})  # 正则匹配

比较符号

符号含义示例
$lt小于{'age': {'$lt': 20}}
$gt大于{'age': {'$gt': 20}}
$lte小于等于{'age': {'$lte': 20}}
$gte大于等于{'age': {'$gte': 20}}
$ne不等于{'age': {'$ne': 20}}
$in在范围内{'age': {'$in': [20, 23]}}
$regex正则匹配{'name': {'$regex': '^M.*'}}

3.4 计数、排序、分页

python
# 计数
count = collection.find({'age': 20}).count()

# 排序
results = collection.find().sort('name', pymongo.ASCENDING)  # 升序
results = collection.find().sort('name', pymongo.DESCENDING)  # 降序

# 分页
results = collection.find().sort('name', pymongo.ASCENDING).skip(2).limit(2)

3.5 更新数据

python
condition = {'name': 'Kevin'}

# 更新单条(推荐)
result = collection.update_one(condition, {'$set': {'age': 26}})
print(result.matched_count, result.modified_count)

# 更新多条
result = collection.update_many({'age': {'$gt': 20}}, {'$inc': {'age': 1}})

3.6 删除数据

python
# 删除单条
result = collection.delete_one({'name': 'Kevin'})

# 删除多条
result = collection.delete_many({'age': {'$lt': 25}})
print(result.deleted_count)

四、Redis 存储

Redis 是基于内存的高效键值型数据库,存取效率极高,支持多种数据结构。

4.1 连接 Redis

python
from redis import StrictRedis, ConnectionPool

# 方式一:直接连接
redis = StrictRedis(host='localhost', port=6379, db=0, password='foobared')

# 方式二:连接池
pool = ConnectionPool(host='localhost', port=6379, db=0, password='foobared')
redis = StrictRedis(connection_pool=pool)

# 方式三:URL 连接
url = 'redis://:foobared@localhost:6379/0'
pool = ConnectionPool.from_url(url)
redis = StrictRedis(connection_pool=pool)

# 测试
redis.set('name', 'Bob')
print(redis.get('name'))  # b'Bob'

4.2 字符串操作

python
redis.set('name', 'Bob')          # 设置值
redis.get('name')                 # 获取值
redis.getset('name', 'Mike')      # 设置新值并返回旧值
redis.mset({'name1': 'A', 'name2': 'B'})  # 批量设置
redis.mget(['name1', 'name2'])    # 批量获取
redis.incr('age', 1)              # 自增
redis.decr('age', 1)              # 自减
redis.append('name', 'OK')        # 追加字符串

4.3 列表操作

python
redis.rpush('list', 1, 2, 3)      # 右侧插入
redis.lpush('list', 0)            # 左侧插入
redis.llen('list')                # 长度
redis.lrange('list', 0, -1)       # 获取范围
redis.lindex('list', 1)           # 获取指定索引
redis.lpop('list')                # 左侧弹出
redis.rpop('list')                # 右侧弹出

4.4 集合操作

python
redis.sadd('tags', 'Book', 'Tea', 'Coffee')  # 添加元素
redis.srem('tags', 'Book')                    # 删除元素
redis.smembers('tags')                        # 所有元素
redis.sismember('tags', 'Book')               # 是否存在
redis.sinter(['tags', 'tags2'])               # 交集
redis.sunion(['tags', 'tags2'])               # 并集
redis.sdiff(['tags', 'tags2'])                # 差集

4.5 有序集合操作

python
redis.zadd('grade', {'Bob': 100, 'Mike': 98})  # 添加(带分数)
redis.zrange('grade', 0, -1, withscores=True)  # 按分数升序
redis.zrevrange('grade', 0, -1)                # 按分数降序
redis.zrank('grade', 'Bob')                    # 排名(升序)
redis.zcount('grade', 80, 100)                 # 分数范围计数

4.6 散列操作

python
redis.hset('price', 'cake', 5)                # 设置字段
redis.hget('price', 'cake')                   # 获取字段
redis.hmset('price', {'apple': 3, 'banana': 2})  # 批量设置
redis.hmget('price', ['apple', 'banana'])     # 批量获取
redis.hgetall('price')                        # 获取全部
redis.hkeys('price')                          # 所有键
redis.hvals('price')                          # 所有值

4.7 键操作

python
redis.exists('name')              # 键是否存在
redis.delete('name')              # 删除键
redis.type('name')                # 键类型
redis.keys('n*')                  # 模式匹配
redis.expire('name', 10)          # 设置过期时间(秒)
redis.ttl('name')                 # 获取剩余时间

五、存储方式选择建议

存储方式特点适用场景
TXT简单、兼容性好日志、临时数据
JSON结构化、易读API 数据、配置文件
CSV表格形式、Excel 兼容数据分析、导入导出
MySQLACID 事务、复杂查询结构化数据、关系型数据
MongoDB灵活 Schema、高性能非结构化数据、大规模存储
Redis内存存储、极速读写缓存、队列、去重

爬虫常用组合

  • 小规模:CSV / JSON 文件
  • 中等规模:MongoDB(灵活,无需建表)
  • 大规模分布式:MongoDB + Redis(Redis 用于队列和去重)

← 返回 Python 深度研究