创建 db.py
db.py 文件是我们管理数据库连接和模型基类的地方。它让我们的代码更加模块化和可维护,实际生产中也是类似的,无论是在 FastAPI 或者 Flask 等框架中,当使用到 SqlAlchemy 时,的的确确需要一个单独 db.py,存储着引擎、会话以及模型基类。这个文件请务必建好,后续所有教程都会使用到。
from sqlalchemy import * from sqlalchemy.orm import sessionmaker, declarative_base, Session, DeclarativeBase # 1.创建数据库驱动引擎 engine = create_engine( # url = '驱动://账户:密码@地址:端口/数据库名?charset=编码' url='mysql+pymysql://root:0908@localhost:3306/db_sqlalchemy_demo?charset=utf8mb4', echo=True, pool_size=8, # 连接池的数据库连接数量 max_overflow=30, # 连接池的数据库连接最大数量 pool_recycle=60 * 30, # 设置秒数限制数据库多久没连接自动断开 ) # 2.基于底层数据库驱动建立数据库连接会话 # DBSession = sessionmaker(bind=engine) # session = DBSession() session = Session(bind=engine) # 3.模型对象基类,提供数据库的基操和方法 # Model = declarative_base() class Model(DeclarativeBase): pass
创建 model.py
该文件负责模型类创建,需要基于 db.py 文件中的各个对象来创建。方便于其他文件或者生产开发中的引入(比如 Flask 项目可以根据业务引入相对应的模型类,进行增删改查操作)
import datetime import db class Student(db.Model): __tablename__ = 'tb_student' id = db.Column(db.Integer, primary_key=True, comment='学生编号') name = db.Column(db.String(20), comment='学生姓名') sex = db.Column(db.Boolean, default=True, comment='学生性别') age = db.Column(db.SmallInteger, comment='学生年龄') class_ = db.Column('class', db.SMALLINT, comment='学生班级') description = db.Column(db.Text, comment='个性签名') status = db.Column(db.Boolean, default=True, comment='登录状态') addtime = db.Column(db.DateTime, default=datetime.datetime.now, comment='入学时间') orders = db.Column(db.SMALLINT, default=True, comment='学生排序') def __repr__(self): return f'<{self.__class__.__name__}: {self.name}({self.id})>' def to_dict(self): return { 'id': self.id, 'name': self.name, 'sex': self.sex, 'age': self.age, 'class': self.class_, 'description': self.description, 'status': self.status, 'addtime': self.addtime.strftime('%Y-%m-%d %H:%M:%S'), 'orders': self.orders, }
在此之前,确保数据库和表已经创建。如果没有创建,可以运行以下代码来创建表:
import db db.Model.metadata.create_all(db.engine) # 删除所有表格如下操作: db.Model.metadata.drop_all(db.engine)
添加数据
请注意,上方 db.py 以及 model.py 是在同一目录下的。
import db from model import Student def run(): try: student = Student( id=1, name='王小明', sex=True, # 默认值为 True,可以省略 age=18, class_=3, description='滚出去', status=True, # 默认值为 True,可以省略 orders=0 # 默认值为 0,可以省略 ) # 实例化学生模型类 db.session.add(student) # 通过会话告知添加 # 另外还有一个 add_all([student1, ...]) db.session.commit() # 我们使用的 MySQL 是支持事务操作的,上述实例化、添加无误后,可提交,这样才是真正的添加数据完毕 print(student.to_dict()) # 打印瞅瞅 except Exception as e: db.session.rollback() # 出现异常就回滚(事务操作),并打印异常 print(f"Error: {e}") finally: db.session.close() # 最终手动关闭会话 if __name__ == '__main__': run()
添加多条数据如下:
import db from model import Student students = [ Student(name='汪伦', age=19, class_=3, description='滚出去', ), Student(name='上官丽丽', age=18, sex=False, class_=1, description='滚进来', ), ] db.session.add_all(students) db.session.commit()
查询数据
查询一条数据
现在我们来查询刚刚添加的数据:
import db from model import Student # 调用会话的 Query 对象,该对象有一个 get 方法,传入主键值即可 # 查询 id 为 1 的学生,也就是 王小明 student = db.session.query(Student).get(1) # 单个主键 联合主键则使用(1,2) 或者 {'id': 1, 'class_id': 2} print(student.to_dict()) # 上述可能会提示警告,可使用下列方式 q1 = db.select(Student).where(Student.id == 1) # SELECT * FROM Student WHERE id == 1 student = db.session.execute(q1).scalar() print(student.to_dict()) # 该方式也行 q1 = db.select(Student) # SELECT * FROM Student student = db.session.execute(q1).scalar() # 取出一条数据 print(student.to_dict()) # 这样也可以查询id为 1 的学生,方式还是很多的 student = db.session.query(Student).filter_by(id=1).first() print(student.to_dict()) print(student.__dict__)
在后续的教程中,无非都是利用 db 中的 select、delete 等语句结合会话执行,又或者是会话中的 Query 对象进行筛选限制……
查询多条数据
import db from model import Student # 获取所有的数据,其实和 SQL 语句还是有相似之处的 q2 = db.select(Student) # SELECT * FROM Student students = db.session.execute(q2).scalars() # 创建迭代器对象 # scalar 是查询出来的首个,scalars 是所有,并不是所有数据,而是迭代器,遍历可拿到 # for stu in students: # print(stu.to_dict()) # 务必注意以下这一点 print(students) # 正常打印数据 print(students) # 打印空列表 [] # 为什么?请参考迭代器的原理,简而言之就是(拿完了,没了~) # 怎么解决?转成列表(赋值或者拷贝?未测试过,自行试试吧!)
条件过滤数据
import db from model import Student # 官网示例的方式如下: q3 = db.select(Student).where(Student.class_ == 3) students = db.session.execute(q3).scalars() for stu in students: print(stu.to_dict()) # 你也可以这样: students = db.session.query(Student).filter_by(class_=3).all() print(students) students = db.session.query(Student).filter(Student.class_ == 3).all() print(students)
值得注意的是 filter_by 不支持 大于小于等 范围查询,而需要使用 filter 或者官网示例中 Select 对象 where 语句进行查询。
逻辑查询数据
如果想使用并且、或者、取非这些逻辑条件怎么办呢?SQLAlchemy 可不支持我们 Python 语法的条件,你可以试试,是否报错或者结果有问题!如果数据没问题,我只能说巧了,少部分条件还是可以判断到的,但想要精确无误,还是需要使用下方的三个函数。
SQLAlchemy 内置有这三个相关的函数:and_、not_、or_,可相互嵌套使用。
- and_(条件1, 条件2, ...)
- not_(条件)
- or_(条件1, 条件2, ...)
import db from model import Student # 查询班级不是1的学生或者非女生,条件看似复杂,有内到外拆分 # 内部 and_ 表示 班级1的女生,外层是 not_,取反即可,班级1的女生除外,其他都能取 # 经过数学转义并更贴切表示就是:(not (班级 == 1)) or (not (性别 == 女)) q4 = db.select(Student).where(db.not_(db.and_(Student.class_ == 1, Student.sex == 0))) students = db.session.execute(q4).scalars() for stu in students: print(stu.to_dict()) # 三个条件函数可支持 select 对象中的 where 以及 会话 Query 对象中的 filter students = db.session.query(Student).filter(db.not_(db.and_(Student.class_ == 1, Student.sex == 0))).all() for stu in students: print(stu.to_dict())
到目前为止,我认为你有必要牢记两个对象,一个是 Select,源于 SqlAlchemy,另一个是 Qurey,源于我们的会话 session 中。Select 对象或者将来的 Delete 等,本质上可看做是构造 SQL 语句的,构造后通过会话的 execute 方法执行,有数据返回时可调用 scalar 或者 scalars 获取。而 Query 是查询过滤专用的,基于会话直接使用,可调用 all、first 等方法获取数据。
成员查询数据
除了上述的三个逻辑函数,还有一个模型对象的字段对象所拥有的方法:in_,也就是类似于我们 Python 中的成员运算符 in。SQLAlchemy 同样不支持 in 运算符,因此你还需要掌握字段对象的 in_ 方法
'a' in 'abcde' # SQLAlchemy student.id.in_([1, 3, 5, 6])
请参考实际例子:
import db from model import Student # 查询学生编号为1、2的学生 students = db.session.query(Student).filter(Student.id.in_([1, 2])).all() for stu in students: print(stu.to_dict()) # 使用 Select 对象也可~ q5 = db.select(Student).where(Student.id.in_([1, 2])) students = db.session.execute(q5).scalars() for stu in students: print(stu.to_dict())
更新数据
查询到对象后,直接对对象属性进行赋值操作,可修改记录的字段值,再通过 commit 提交后,可保存记录。
import db from model import Student q = db.select(Student).where(Student.id == 1) student = db.session.execute(q).scalar() print(student.to_dict()) # 打印查询到的数据 student.name = '张晓明' # 将 王小明 转成 张晓明 db.session.commit() # 提交事务 # 再此查询一遍 q = db.select(Student).where(Student.id == 1) student = db.session.execute(q).scalar() print(student.to_dict()) # 打印数据
删除数据
删除某条记录,首先需要有删除的条件,其次才能删除。
import db from model import Student # 方式1 q = db.select(Student).where(Student.id == 1) student = db.session.execute(q).scalar() db.session.delete(student) db.session.commit() # 方式2 # q = db.delete(Student).where(Student.id == 1) # db.session.execute(q) # db.session.commit() # 查询一下数据 q = db.select(Student).where(Student.id == 1) student = db.session.execute(q).scalar() print(student) # 应为 None
以上例子仅供参考,具体条件可以更加的灵活和简单。