分页查询
分页查询是处理大量数据时常用的技术,通过分页可以将数据分成多个小部分,方便用户逐页查看。SQLAlchemy 提供了简单易用的方法来实现分页查询。
本篇我们也会在最终实现这样的分页效果:

1. 什么是分页查询
分页查询是将查询结果按照一定数量分成多页展示,每页显示固定数量的记录。分页查询通常使用两个参数:
limit:每页显示的记录数量。offset:跳过的记录数量。
例如,要查询第二页,每页显示 10 条记录:
limit:10offset:10
2. 使用 SQLAlchemy 实现分页查询
基本查询
首先,我们需要一个基本的查询来获取数据:
import db from model import Student def basic_query(): students = db.session.query(Student).all() for stu in students: print(stu.to_dict())
使用 limit 和 offset
前文中,我们已经了解到 SQLAlchemy 提供了 limit 和 offset 方法来实现分页查询。limit 限制返回的记录数量,offset 跳过指定数量的记录。
import db from model import Student def paginated_query(page, per_page): q = db.select(Student).limit(per_page).offset((page - 1) * per_page) students = db.session.execute(q).scalars() for stu in students: print(stu.to_dict())
例如,要获取第 2 页,每页显示 10 条记录:
paginated_query(2, 10)
对应的 SQL 语句:
SELECT * FROM tb_student LIMIT 10 OFFSET 10;
3. 前后端实现分页功能
后端分页
在后端实现分页功能时,可以创建一个函数来处理分页逻辑。这个函数接受 page 和 per_page 参数,并返回当前页的数据和总页数。
import db from model import Student def get_paginated_students(page, per_page): total = db.session.query(Student).count() q = db.select(Student).limit(per_page).offset((page - 1) * per_page) students = db.session.execute(q).scalars() return { 'total': total, 'page': page, 'per_page': per_page, 'pages': (total + per_page - 1) // per_page, 'data': [stu.to_dict() for stu in students] }
前端分页
在前端实现分页时,可以使用后端提供的分页数据来渲染页面:
{ "total": 100, "page": 2, "per_page": 10, "pages": 10, "data": [ {"id": 11, "name": "Student 11", ...}, {"id": 12, "name": "Student 12", ...}, ... ] }
前端可以根据这些数据渲染分页控件和当前页的数据。
[拓展] Flask 分页演示
下面是一个前后端不分离的 Flask 项目,代码文件比较多,你需要自行理一下。同时也要保证 Flask 、 Flask-SQLAlchemy 与 Flask-MysqlDB 的安装。
pip install flask pip install flask-sqlalchemy # 兼容 Flask 的 SQLAlchemy 框架,提供 ORM 功能 pip install flask-mysqldb # 为 Flask-SQLAlchemy 提供 MySQL 驱动
Flask 项目目录如下:
flask_app/ # 项目目录 ├── templates/ # 模板目录 │ └── list.html # 模板文件 ├── config.py # Flask 配置文件 ├── db.py # 数据库核心文件,包含重要操作 ├── manage.py # Flask 路由和业务视图文件 └── models.py # 数据库模型文件
首先看一下配置文件 config.py:
class Config: SQLALCHEMY_DATABASE_URI = 'mysql://root:0908@localhost:3306/db_flask_demo_school?charset=utf8mb4' # 数据库连接。自行替换数据库用户名称和密码以及实际数据库名 SQLALCHEMY_ECHO = False # 是否打印执行的 SQL 语句及其耗时 DEBUG = True # 是否启用调试模式
db.py
""" Create database: > create database db_flask_demo_school charset=utf8mb4 """ from flask_sqlalchemy import SQLAlchemy from sqlalchemy import * db = SQLAlchemy()
models.py
from db import * class Student(db.Model): __tablename__ = 'tb_student2' id = db.Column(db.Integer, primary_key=True, comment="主键") name = db.Column(db.String(15), index=True, comment="姓名") age = db.Column(db.SmallInteger, comment="年龄") sex = db.Column(db.Boolean, comment="性别") email = db.Column(db.String(128), unique=True, comment="邮箱地址") money = db.Column(db.Numeric(10, 2), default=0.0, comment="钱包") def to_dict(self): return { 'id': self.id, 'name': self.name, 'age': self.age, 'sex': self.sex, 'email': self.email, 'money': float(self.money) } def __repr__(self): return f'<{self.__class__.__name__}: {self.name}>'
然后就是 manage.py,编写了路由与业务代码:
from pathlib import Path from flask import Flask, jsonify, request, render_template from config import Config from models import db, Student app = Flask(__name__, template_folder='./templates') app.config.from_object(Config) db.init_app(app) @app.route('/', methods=['GET']) def index(): """没啥用,勿看""" title = Path(__file__).name return title @app.route('/students', methods=['POST']) def create_student(): """采集访问的信息,创建学生""" sex = request.form.get('sex') sex = int(sex) if sex.isdigit() else 0 student = Student( name=request.form.get('name', '未知'), age=request.form.get('age', 0), sex=bool(sex), email=request.form.get('email', ''), money=request.form.get('money', 0), ) if request.form.get('id', None) is not None: student.id = request.form['id'] db.session.add(student) db.session.commit() return jsonify({ 'success': True, 'data': student.to_dict(), 'msg': 'success' }), 201 @app.route('/students', methods=['DELETE']) def delete_students(): """删除学生表的所有记录""" db.session.execute(db.delete(Student)) db.session.commit() return jsonify({ 'success': True, 'data': None, 'msg': 'success' }) @app.route('/students', methods=['GET']) def get_students(): # 旧版本 2.x 获取全部数据 # students = Student.query.all() # 新版本 3.1.x 获取全部数据 students = db.session.execute(db.select(Student).where()).scalars() return jsonify({ 'success': True, 'data': { 'count': Student.query.count(), 'students': [student.to_dict() for student in students] }, 'msg': 'success' }) @app.route('/students/<int:student_id>', methods=['GET']) def get_student(student_id): # 根据主键查询数据,不存在则为 None student = db.session.get(Student, student_id) if not student: return jsonify({ 'success': False, 'data': None, 'msg': 'student not found' }) return jsonify({ 'success': True, 'data': student.to_dict(), 'msg': 'success' }) @app.route('/students/data', methods=['GET']) def students_data(): """这里是分页器的使用,不同于我们所使用的 limit 和 offset 需要自己编写""" # 不采取数据分页时,大量数据时会导致服务器运存膨胀,这是非常不妥的 page = request.args.get('page', 1, type=int) per_page = request.args.get('size', 3, type=int) # 创建分页器对象 pagination = Student.query.paginate(page=page, per_page=per_page, max_per_page=20) print('当前页对象', pagination) print('总数据量', pagination.total) print('当前页数据列表', pagination.items) print('总页码', pagination.pages) print() print('是否有上一页', pagination.has_prev) print('上一页页码', pagination.prev_num) print('上一页对象', pagination.prev()) print('上一页对象的数据列表', pagination.prev().items) print() print('是否有下一页', pagination.has_next) print('下一页页码', pagination.next_num) print('下一页对象', pagination.next()) print('下一页对象的数据列表', pagination.next().items) # """前后端分离推荐使用的 json 结果,这里没用到""" data = { "page": pagination.page, # 当前页码 "pages": pagination.pages, # 总页码 "has_prev": pagination.has_prev, # 是否有上一页 "prev_num": pagination.prev_num, # 上一页页码 "has_next": pagination.has_next, # 是否有下一页 "next_num": pagination.next_num, # 下一页页码 "items": [{ "id": item.id, "name": item.name, "age": item.age, "sex": item.sex, "money": item.money, } for item in pagination.items] } return render_template('list.html', **locals()) if __name__ == '__main__': with app.app_context(): db.drop_all() # 启动时先删除相关表,后创建相关表 db.create_all() app.run('0.0.0.0', 9527)
最后就是 list.html 这个模板文件,呈现一个分页的演示:
<!DOCTYPE html> <html lang="en"> <head> <meta charset="UTF-8"> <title>Title</title> <style> body { font-family: Arial, sans-serif; background-color: #f4f7fa; color: #333; } table { border-collapse: collapse; margin: 50px auto; width: 80%; box-shadow: 0 2px 10px rgba(0, 0, 0, 0.1); background-color: #fff; } th, td { padding: 12px 15px; text-align: center; } th { background-color: #007bff; color: #fff; text-transform: uppercase; } tr:nth-child(even) { background-color: #f2f2f2; } tr:hover { background-color: #e9f5ff; } .page { margin: 20px auto; text-align: center; } .page a, .page span { padding: 8px 16px; margin: 0 4px; color: #007bff; background: #fff; border: 1px solid #007bff; border-radius: 4px; text-decoration: none; transition: background-color 0.3s, color 0.3s; } .page a:hover { background-color: #007bff; color: #fff; } .page span { background-color: #007bff; color: #fff; } </style> </head> <body> <table border="1" align="center" width="600"> <tr> <th>ID</th> <th>Age</th> <th>Name</th> <th>Sex</th> <th>Money</th> </tr> {% for student in pagination.items %} <tr> <td>{{ student.id }}</td> <td>{{ student.age }}</td> <td>{{ student.name }}</td> <td>{{ "男" if student.sex else "女" }}</td> <td>{{ student.money }}</td> </tr> {% endfor %} <tr align="center"> <td colspan="5" class="page"> {% if pagination.has_prev %} <a href="?page=1">首 页</a> <a href="?page={{ pagination.page - 1 }}">上一页</a> <a href="?page={{ pagination.page - 1 }}">{{ pagination.page - 1 }}</a> {% endif %} <span>{{ pagination.page }}</span> {% if pagination.has_next %} <a href="?page={{ pagination.page + 1 }}">{{ pagination.page + 1 }}</a> <a href="?page={{ pagination.page + 1 }}">下一页</a> <a href="?page={{ pagination.pages }}">尾 页</a> {% endif %} </td> </tr> </table> </body> </html>
为了确保能够有一定数量的数据,请你另外新建一个 request.py,用于创建大量数据(如果你知道 faker 的使用,也可以自己弄一些数据),先启动 manage.py,保证后端服务的开启和路由可用,然后直接运行该文件后可添加测试数据:
# request.py import requests # pip install requests students = [ # 虚拟数据,务必当真 { 'name': '王毅', 'age': 21, 'sex': 1, 'email': 'wangyi@gmail.com', 'money': 4488.5 }, { 'name': '张晓', 'age': 19, 'sex': 0, 'email': 'zhangxiao@example.com', 'money': 2389.75 }, { 'name': '李春阳', 'age': 23, 'sex': 1, 'email': 'lichunyang@outlook.com', 'money': 6715.32 }, { 'name': '刘瑞', 'age': 20, 'sex': 0, 'email': 'liurui@yahoo.com', 'money': 3456.89 }, { 'name': '陈欢', 'age': 22, 'sex': 1, 'email': 'chenhuan@gmail.com', 'money': 5678.12 }, { 'name': '吴娜', 'age': 18, 'sex': 0, 'email': 'wuna@example.org', 'money': 1234.56 }, { 'name': '赵丹', 'age': 24, 'sex': 0, 'email': 'zhaoda@outlook.com', 'money': 7890.43 }, { 'name': '孙宇', 'age': 21, 'sex': 1, 'email': 'sunyu@yahoo.co.jp', 'money': 4567.89 }, { 'name': '黄宇', 'age': 19, 'sex': 1, 'email': 'huangyu@gmail.com', 'money': 2345.67 }, { 'name': '杨静', 'age': 22, 'sex': 0, 'email': 'yangjing@example.com', 'money': 6789.01 } ] for student in students: response = requests.request('POST', 'http://127.0.0.1:9527/students', data=student) print('添加一条记录', response.json())
确定 Flask 项目正常启动,并且上面的数据也完成了注入,如果你发现启动失败了,请检查路由、数据库连接是否有问题,你可能需要一定的 Flask 基础知识。接下来如何访问我们渲染的模板呢?
根据路由视图和设置的访问端口(9527):
@app.route('/students/data', methods=['GET']) def students_data(): ... return render_template('list.html', **locals())
我们直接在浏览器访问:http://127.0.0.1:9527/students/data 这个地址即可。
上述案例是演示所用,随意写的,小部分代码参考了某机构的教程代码示例,平台原因无法标注,路由设计也是很随便的,这种代码如果存在版权纠纷,emmm.....,请联系我删除,谢谢。无私开源,只为搞懂后端开发的学习,请勿钻牛角……