SQLAlchemy – 模块文件以及增删改查(CURD操作)

创建 db.py

db.py 文件是我们管理数据库连接和模型基类的地方。它让我们的代码更加模块化和可维护,实际生产中也是类似的,无论是在 FastAPI 或者 Flask 等框架中,当使用到 SqlAlchemy 时,的的确确需要一个单独 db.py,存储着引擎、会话以及模型基类。这个文件请务必建好,后续所有教程都会使用到。

from sqlalchemy import * from sqlalchemy.orm import sessionmaker, declarative_base, Session, DeclarativeBase  # 1.创建数据库驱动引擎 engine = create_engine(     # url = '驱动://账户:密码@地址:端口/数据库名?charset=编码'     url='mysql+pymysql://root:0908@localhost:3306/db_sqlalchemy_demo?charset=utf8mb4',     echo=True,     pool_size=8,  # 连接池的数据库连接数量     max_overflow=30,  # 连接池的数据库连接最大数量     pool_recycle=60 * 30,  # 设置秒数限制数据库多久没连接自动断开 )  # 2.基于底层数据库驱动建立数据库连接会话 # DBSession = sessionmaker(bind=engine) # session = DBSession() session = Session(bind=engine)   # 3.模型对象基类,提供数据库的基操和方法 # Model = declarative_base() class Model(DeclarativeBase):     pass  

创建 model.py

该文件负责模型类创建,需要基于 db.py 文件中的各个对象来创建。方便于其他文件或者生产开发中的引入(比如 Flask 项目可以根据业务引入相对应的模型类,进行增删改查操作)

import datetime import db   class Student(db.Model):     __tablename__ = 'tb_student'     id = db.Column(db.Integer, primary_key=True, comment='学生编号')     name = db.Column(db.String(20), comment='学生姓名')     sex = db.Column(db.Boolean, default=True, comment='学生性别')     age = db.Column(db.SmallInteger, comment='学生年龄')     class_ = db.Column('class', db.SMALLINT, comment='学生班级')     description = db.Column(db.Text, comment='个性签名')     status = db.Column(db.Boolean, default=True, comment='登录状态')     addtime = db.Column(db.DateTime, default=datetime.datetime.now, comment='入学时间')     orders = db.Column(db.SMALLINT, default=True, comment='学生排序')      def __repr__(self):         return f'<{self.__class__.__name__}: {self.name}({self.id})>'      def to_dict(self):         return {             'id': self.id,             'name': self.name,             'sex': self.sex,             'age': self.age,             'class': self.class_,             'description': self.description,             'status': self.status,             'addtime': self.addtime.strftime('%Y-%m-%d %H:%M:%S'),             'orders': self.orders,         }  

在此之前,确保数据库和表已经创建。如果没有创建,可以运行以下代码来创建表:

import db   db.Model.metadata.create_all(db.engine) # 删除所有表格如下操作: db.Model.metadata.drop_all(db.engine) 

添加数据

请注意,上方 db.py 以及 model.py 是在同一目录下的。

import db from model import Student  def run():     try:         student = Student(             id=1,             name='王小明',             sex=True,  # 默认值为 True,可以省略             age=18,             class_=3,             description='滚出去',             status=True,  # 默认值为 True,可以省略             orders=0  # 默认值为 0,可以省略         )  # 实例化学生模型类         db.session.add(student)  # 通过会话告知添加         # 另外还有一个 add_all([student1, ...])         db.session.commit()  # 我们使用的 MySQL 是支持事务操作的,上述实例化、添加无误后,可提交,这样才是真正的添加数据完毕         print(student.to_dict())  # 打印瞅瞅     except Exception as e:         db.session.rollback()  # 出现异常就回滚(事务操作),并打印异常         print(f"Error: {e}")     finally:         db.session.close()  # 最终手动关闭会话  if __name__ == '__main__':     run()  

添加多条数据如下:

import db from model import Student      students = [     Student(name='汪伦', age=19, class_=3, description='滚出去', ),     Student(name='上官丽丽', age=18, sex=False, class_=1, description='滚进来', ), ] db.session.add_all(students) db.session.commit() 

查询数据

查询一条数据

现在我们来查询刚刚添加的数据:

import db from model import Student   # 调用会话的 Query 对象,该对象有一个 get 方法,传入主键值即可 # 查询 id 为 1 的学生,也就是 王小明 student = db.session.query(Student).get(1)  # 单个主键 联合主键则使用(1,2) 或者 {'id': 1, 'class_id': 2} print(student.to_dict()) # 上述可能会提示警告,可使用下列方式 q1 = db.select(Student).where(Student.id == 1)  # SELECT * FROM Student WHERE id == 1 student = db.session.execute(q1).scalar() print(student.to_dict()) # 该方式也行 q1 = db.select(Student)   # SELECT * FROM Student student = db.session.execute(q1).scalar()  # 取出一条数据 print(student.to_dict()) # 这样也可以查询id为 1 的学生,方式还是很多的 student = db.session.query(Student).filter_by(id=1).first() print(student.to_dict()) print(student.__dict__) 

在后续的教程中,无非都是利用 db 中的 select、delete 等语句结合会话执行,又或者是会话中的 Query 对象进行筛选限制……

查询多条数据

import db from model import Student  # 获取所有的数据,其实和 SQL 语句还是有相似之处的 q2 = db.select(Student)    # SELECT * FROM Student students = db.session.execute(q2).scalars()  # 创建迭代器对象 # scalar 是查询出来的首个,scalars 是所有,并不是所有数据,而是迭代器,遍历可拿到 # for stu in students: #     print(stu.to_dict())  # 务必注意以下这一点 print(students)  # 正常打印数据 print(students)  # 打印空列表 [] # 为什么?请参考迭代器的原理,简而言之就是(拿完了,没了~) # 怎么解决?转成列表(赋值或者拷贝?未测试过,自行试试吧!) 

条件过滤数据

import db from model import Student   # 官网示例的方式如下: q3 = db.select(Student).where(Student.class_ == 3) students = db.session.execute(q3).scalars() for stu in students:     print(stu.to_dict()) # 你也可以这样: students = db.session.query(Student).filter_by(class_=3).all() print(students) students = db.session.query(Student).filter(Student.class_ == 3).all() print(students) 

值得注意的是 filter_by 不支持 大于小于等 范围查询,而需要使用 filter 或者官网示例中 Select 对象 where 语句进行查询。

逻辑查询数据

如果想使用并且、或者、取非这些逻辑条件怎么办呢?SQLAlchemy 可不支持我们 Python 语法的条件,你可以试试,是否报错或者结果有问题!如果数据没问题,我只能说巧了,少部分条件还是可以判断到的,但想要精确无误,还是需要使用下方的三个函数。

SQLAlchemy 内置有这三个相关的函数:and_not_or_,可相互嵌套使用。

  1. and_(条件1, 条件2, ...)
  2. not_(条件)
  3. or_(条件1, 条件2, ...)
import db from model import Student  # 查询班级不是1的学生或者非女生,条件看似复杂,有内到外拆分 # 内部 and_ 表示 班级1的女生,外层是 not_,取反即可,班级1的女生除外,其他都能取 # 经过数学转义并更贴切表示就是:(not (班级 == 1)) or (not (性别 == 女)) q4 = db.select(Student).where(db.not_(db.and_(Student.class_ == 1, Student.sex == 0))) students = db.session.execute(q4).scalars() for stu in students:     print(stu.to_dict()) # 三个条件函数可支持 select 对象中的 where 以及 会话 Query 对象中的 filter students = db.session.query(Student).filter(db.not_(db.and_(Student.class_ == 1, Student.sex == 0))).all() for stu in students:     print(stu.to_dict()) 

到目前为止,我认为你有必要牢记两个对象,一个是 Select,源于 SqlAlchemy,另一个是 Qurey,源于我们的会话 session 中。Select 对象或者将来的 Delete 等,本质上可看做是构造 SQL 语句的,构造后通过会话的 execute 方法执行,有数据返回时可调用 scalar 或者 scalars 获取。而 Query 是查询过滤专用的,基于会话直接使用,可调用 allfirst 等方法获取数据。

成员查询数据

除了上述的三个逻辑函数,还有一个模型对象的字段对象所拥有的方法:in_,也就是类似于我们 Python 中的成员运算符 in。SQLAlchemy 同样不支持 in 运算符,因此你还需要掌握字段对象的 in_ 方法

'a' in 'abcde' # SQLAlchemy student.id.in_([1, 3, 5, 6]) 

请参考实际例子:

import db from model import Student   # 查询学生编号为1、2的学生 students = db.session.query(Student).filter(Student.id.in_([1, 2])).all() for stu in students:     print(stu.to_dict())      # 使用 Select 对象也可~ q5 = db.select(Student).where(Student.id.in_([1, 2])) students = db.session.execute(q5).scalars() for stu in students:     print(stu.to_dict())  

更新数据

查询到对象后,直接对对象属性进行赋值操作,可修改记录的字段值,再通过 commit 提交后,可保存记录。

import db from model import Student  q = db.select(Student).where(Student.id == 1) student = db.session.execute(q).scalar() print(student.to_dict())  # 打印查询到的数据 student.name = '张晓明'  # 将 王小明 转成 张晓明 db.session.commit()  # 提交事务  # 再此查询一遍 q = db.select(Student).where(Student.id == 1) student = db.session.execute(q).scalar() print(student.to_dict())  # 打印数据 

删除数据

删除某条记录,首先需要有删除的条件,其次才能删除。

import db from model import Student   # 方式1 q = db.select(Student).where(Student.id == 1) student = db.session.execute(q).scalar() db.session.delete(student) db.session.commit() # 方式2 # q = db.delete(Student).where(Student.id == 1) # db.session.execute(q) # db.session.commit()  # 查询一下数据 q = db.select(Student).where(Student.id == 1) student = db.session.execute(q).scalar() print(student)  # 应为 None 

以上例子仅供参考,具体条件可以更加的灵活和简单。

发表评论

评论已关闭。

相关文章