SQLAIchemy 异步DBManager封装-03得心应手

前言

  1. SQLAIchemy 异步DBManager封装-01入门理解
  2. SQLAIchemy 异步DBManager封装-02熟悉掌握

在前两篇文章中,我们详细介绍了SQLAlchemy异步DBManager的封装过程。第一篇文章帮助我们入门理解了整体的封装结构和思路,第二篇文章则帮助我们更加熟悉和掌握了这个封装的使用。我们已经介绍了添加和查询操作,并且对整体的封装思路有了深入的了解。

在本文中,我将继续扩展封装,介绍如何进行更新和删除操作。同时,我将演示如何执行原生的SQL语句,并介绍在异常情况下如何进行事务回滚的场景。这些内容将帮助我们更全面地应对各种数据库操作的需求。

更新封装

from sqlalchemy import Result, column, delete, func, select, text, update

@with_session
async def update(
        self,
        values: dict,
        *,
        orm_table: Type[BaseOrmTable] = None,
        conds: list = None,
        session: AsyncSession = None,
):
    """
    更新数据
    Args:
        values: 要更新的字段和对应的值,字典格式,例如 {"field1": value1, "field2": value2, ...}
        orm_table: ORM表映射类
        conds: 更新条件列表,每个条件为一个表达式,例如 [UserTable.username == "hui", ...]
        session: 数据库会话对象,如果为 None,则在方法内部开启新的事务

    Returns: 影响的行数
        cursor_result.rowcount
    """
    orm_table = orm_table or self.orm_table
    conds = conds or []
    values = values or {}
    if not values:
        return
    sql = update(orm_table).where(*conds).values(**values)
    cursor_result = await session.execute(sql)
    return cursor_result.rowcount

@with_session
async def update_or_add(
        self,
        table_obj: [T_BaseOrmTable, dict],
        *,
        orm_table: Type[BaseOrmTable] = None,
        session: AsyncSession = None,
        **kwargs,
):
    """
    指定对象更新or添加数据
    Args:
        table_obj: 映射类实例对象 or dict,
            e.g. UserTable(username="hui", age=18) or {"username": "hui", "v": 18, ...}
        orm_table: ORM表映射类
        session: 数据库会话对象,如果为 None,则在方法内部开启新的事务

    Returns:
    """
    orm_table = orm_table or self.orm_table
    if isinstance(table_obj, dict):
        table_obj = orm_table(**table_obj)

    return await session.merge(table_obj, **kwargs)
  • update 方法通过 sqlaichemy 的 update 来组织sql语句进行条件更新
  • update_or_add 则是指定对象进行更新或新增操作,有主键id则更新,没有则添加,具体是使用 session.merge 方法进行操作。入参的 table_obj 可以是库表映射类实例对象、dict,字典形式则是通过 Manager 下的orm_table 进行转换成映射类实例对象来操作。
class UserFileTable(BaseOrmTable):
    """用户文件表"""

    __tablename__ = "user_file"
    filename: Mapped[str] = mapped_column(String(100), default="", comment="文件名称")
    creator: Mapped[int] = mapped_column(default=0, comment="文件创建者")
    file_suffix: Mapped[str] = mapped_column(String(100), default="", comment="文件后缀")
    file_size: Mapped[int] = mapped_column(default=0, comment="文件大小")
    oss_key: Mapped[str] = mapped_column(String(100), default="", comment="oss key(minio)")
    is_del: Mapped[int] = mapped_column(default=0, comment="是否删除")
    deleted_at: Mapped[datetime] = mapped_column(nullable=True, comment="删除时间")
    
    
class UserFileManager(DBManager):
    orm_table = UserFileTable
    
    
async def update_demo():
    ret = await UserFileManager().update(values={"filename": "hui"}, conds=[UserFileTable.id == 1])
    print("update ret", ret)

    # 添加
    user_file_info = {"filename": "huidbk", "oss_key": uuid.uuid4().hex}
    user_file: UserFileTable = await UserFileManager().update_or_add(table_obj=user_file_info)
    print("update_or_add add", user_file)

    # 更新
    user_file.file_suffix = "png"
    user_file.file_size = 100
    user_file.filename = "hui-update_or_add"
    ret: UserFileTable = await UserFileManager().update_or_add(table_obj=user_file)
    print("update_or_add update", ret)

删除封装

@with_session
async def bulk_delete_by_ids(
        self,
        pk_ids: list,
        *,
        orm_table: Type[BaseOrmTable] = None,
        logic_del: bool = False,
        logic_field: str = "deleted_at",
        logic_del_set_value: Any = None,
        session: AsyncSession = None,
):
    """
    根据主键id批量删除
    Args:
        pk_ids: 主键id列表
        orm_table: orm表映射类
        logic_del: 逻辑删除,默认 False 物理删除 True 逻辑删除
        logic_field: 逻辑删除字段 默认 deleted_at
        logic_del_set_value: 逻辑删除字段设置的值
        session: 数据库会话对象,如果为 None,则通过装饰器在方法内部开启新的事务

    Returns: 删除的记录数
    """
    orm_table = orm_table or self.orm_table
    conds = [orm_table.id.in_(pk_ids)]
    return await self.delete(
        conds=conds,
        orm_table=orm_table,
        logic_del=logic_del,
        logic_field=logic_field,
        logic_del_set_value=logic_del_set_value,
        session=session,
    )

@with_session
async def delete_by_id(
        self,
        pk_id: int,
        *,
        orm_table: Type[BaseOrmTable] = None,
        logic_del: bool = False,
        logic_field: str = "deleted_at",
        logic_del_set_value: Any = None,
        session: AsyncSession = None,
):
    """
    根据主键id删除
    Args:
        pk_id: 主键id
        orm_table: orm表映射类
        logic_del: 逻辑删除,默认 False 物理删除 True 逻辑删除
        logic_field: 逻辑删除字段 默认 deleted_at
        logic_del_set_value: 逻辑删除字段设置的值
        session: 数据库会话对象,如果为 None,则通过装饰器在方法内部开启新的事务

    Returns: 删除的记录数
    """
    orm_table = orm_table or self.orm_table
    conds = [orm_table.id == pk_id]
    return await self.delete(
        conds=conds,
        orm_table=orm_table,
        logic_del=logic_del,
        logic_field=logic_field,
        logic_del_set_value=logic_del_set_value,
        session=session,
    )

@with_session
async def delete(
        self,
        *,
        conds: list = None,
        orm_table: Type[BaseOrmTable] = None,
        logic_del: bool = False,
        logic_field: str = "deleted_at",
        logic_del_set_value: Any = None,
        session: AsyncSession = None,
):
    """
    通用删除
    Args:
        conds: 条件列表, e.g. [UserTable.id == 1]
        orm_table: orm表映射类
        logic_del: 逻辑删除,默认 False 物理删除 True 逻辑删除
        logic_field: 逻辑删除字段 默认 deleted_at
        logic_del_set_value: 逻辑删除字段设置的值
        session: 数据库会话对象,如果为 None,则通过装饰器在方法内部开启新的事务

    Returns: 删除的记录数
    """
    orm_table = orm_table or self.orm_table

    if logic_del:
        # 执行逻辑删除操作
        logic_del_info = dict()
        logic_del_info[logic_field] = logic_del_set_value or datetime.now()
        delete_stmt = update(orm_table).where(*conds).values(**logic_del_info)
    else:
        # 执行物理删除操作
        delete_stmt = delete(orm_table).where(*conds)

    cursor_result = await session.execute(delete_stmt)

    # 返回影响的记录数
    return cursor_result.rowcount
  • 通过主键ID单个删除,组织 conds = [orm_table.id == pk_id],调用 delete 方法
  • 通过主键ID列表批量删,组织 conds = [orm_table.id.in_(pk_ids)] 调用 delete 方法

这两种删除操作都是通过调用 delete 方法实现的。默认情况下,这些操作执行的是物理删除。对于一些重要的数据,我们也可以选择执行逻辑删除。在逻辑删除中,默认使用 deleted_at 字段来记录删除时间。我们也可以指定具体的逻辑删除字段 logic_field,以及逻辑字段的赋值情况 logic_del_set_value,然后进行一个更新操作来实现逻辑删除。

如下是删除前的数据


async def delete_demo():
    file_count = await UserFileManager().query_one(cols=[func.count()], flat=True)
    print("file_count", file_count)

    ret = await UserFileManager().delete_by_id(file_count)
    print("delete_by_id ret", ret)

    ret = await UserFileManager().bulk_delete_by_ids(pk_ids=[10, 11, 12])
    print("bulk_delete_by_ids ret", ret)

    ret = await UserFileManager().delete(conds=[UserFileTable.id == 13])
    print("delete ret", ret)

    ret = await UserFileManager().delete(conds=[UserFileTable.id == 5], logic_del=True)
    print("logic_del ret", ret)

    ret = await UserFileManager().delete(
        conds=[UserFileTable.id == 6], logic_del=True, logic_field="is_del", logic_del_set_value=1
    )
    print("logic_del set logic_field ret", ret)

删除结果展示

file_count 20
delete_by_id ret 0
bulk_delete_by_ids ret 3
delete ret 1
logic_del ret 1
logic_del set logic_field ret 1

主键id 为5、6的被逻辑删除了,10,11,12,13 被物理删除了。

执行原生sql


@with_session
async def run_sql(
        self, sql: str, *, params: dict = None, query_one: bool = False, session: AsyncSession = None
) -> Union[dict, List[dict]]:
    """
    执行并提交单条sql
    Args:
        sql: sql语句
        params: sql参数, eg. {":id_val": 10, ":name_val": "hui"}
        query_one: 是否查询单条,默认False查询多条
        session: 数据库会话对象,如果为 None,则通过装饰器在方法内部开启新的事务

    Returns:
        执行sql的结果
    """
    sql = text(sql)
    cursor_result = await session.execute(sql, params)
    if query_one:
        return cursor_result.mappings().one() or {}
    else:
        return cursor_result.mappings().all() or []

内部执行sql时需要通过 sqlaichemy 的 text 函数转一下,然后根据 query_one 的值来确定查询单条还是多条。

async def run_raw_sql_demo():
    """运行原生sql demo"""
    count_sql = "select count(*) as total_count from user_file"
    count_ret = await UserFileManager().run_sql(count_sql, query_one=True)
    print("count_ret", count_ret)

    data_sql = "select * from user_file where id > :id_val and file_size >= :file_size_val"
    params = {"id_val": 20, "file_size_val": 0}
    data_ret = await UserFileManager().run_sql(data_sql, params=params)
    print("dict data_ret", data_ret)

    data_sql = "select * from user_file where id > :id_val"
    data_ret = await UserFileManager().run_sql(sql=data_sql, params={"id_val": 4})
    print("dict data_ret", data_ret)

    # 连表查询
    data_sql = """
        select
            user.id as user_id,
            username,
            user_file.id as file_id,
            filename,
            oss_key
        from 
            user_file
            join user on user.id = user_file.creator
        where 
            user_file.creator = :user_id
    """
    data_ret = await UserFileManager().run_sql(data_sql, params={"user_id": 1})
    print("join sql data_ret", data_ret)

需要注意的执行原生sql,sql参数的展位符是 :param_name 冒号后面接参数名称,然后参数对应的值则是字典形式组织。

查询结果如下

count_ret {'total_count': 16}

dict data_ret [{'id': 62, 'filename': 'aaa', 'creator': 0, 'file_suffix': '', 'file_size': 0, 'oss_key': '6dd01a72599e467eb3fcdd9b47e1de9c', 'is_del': 0, 'deleted_at': None}, ..]

dict data_ret [{'id': 5, 'filename': 'eee', 'creator': 0, 'file_suffix': '', 'file_size': 0, 'oss_key': '6892400cc83845aca89b2ebafc675471', 'is_del': 0, 'deleted_at': datetime.datetime(2024, 4, 16, 23, 56, 49)}, ...]

join sql data_ret [{'user_id': 1, 'username': 'hui', 'file_id': 1, 'filename': 'hui', 'oss_key': 'bbb'}]

事务回滚操作

async def create_and_transaction_demo():
    async with UserFileManager.transaction() as session:
        await UserFileManager().bulk_add(
            table_objs=[{"filename": "aaa", "oss_key": uuid.uuid4().hex}], session=session
        )
        user_file_obj = UserFileTable(filename="eee", oss_key=uuid.uuid4().hex)
        file_id = await UserFileManager().add(table_obj=user_file_obj, session=session)
        print("file_id", file_id)

        ret: UserFileTable = await UserFileManager().query_by_id(2, session=session)
        print("query_by_id", ret)
        
        # 异常回滚
        a = 1 / 0

        ret = await UserFileManager().query_one(
            cols=[UserFileTable.filename, UserFileTable.oss_key],
            conds=[UserFileTable.filename == "ccc"],
            session=session
        )
        print("ret", ret)

这里通过 transaction() 获取事务会话 session,让后面的数据库操作都指定 session 参数,with_session 装饰器就不会再次构造,实现了共用一个 session,事务内的操作要么都成功要么都失败。

整体封装总结

  • SQLAIchemyManager 设计

    • 用于初始化数据库配置信息
  • BaseOrmTable、TimestampColumns、BaseOrmTableWithTS 设计

    • 通用库表映射类,一些主键id,时间戳字段让子类继承共享,以及 to_dict 方法将对象属性转成字典
  • transaction 上下文管理器(事务会话)

    • 便捷的进行事务处理
  • with_session 装饰器

    • 复用开启事务会话 session 操作,减少冗余代码,没有 session 则动态的构造 session,兼容整体事务会话
  • orm_table 设计

    • 让继承DBManager的子类指定 orm_table ,数据库操作时明确知道具体库表,减少参数传递
  • DBManager 设计

    • 封装了通用的CRUD方法,让子类可以共享和复用这些方法,推荐子类进行常用业务数据查询封装,实现业务逻辑的复用和灵活性。
  • 查询扁平化 flat

    • 查询结果可以直接使用,不需要额外处理,简化了操作流程。
  • 字典与库表映射类实例

    • 一些方法的入参,同时支持字典与库表映射类实例,提高了方法的通用性和灵活性。
  • 分页查询

    • 指定页码、每页大小查询出总数与分页数据
  • 逻辑删除

    • 支持默认的 deleted_at 字段 or 指定逻辑字段进行逻辑删除,保留重要数据
  • 执行原生sql

    • 一些复杂sql操作,不使用 orm 组织,推荐使用原生 sql 进行操作

到这就结束了,希望这些封装,可以满足各种复杂业务场景下的需求,提高数据库操作的灵活性和适用性,从而提高我们的开发效率。让代码变得更简单。

Github源代码

源代码已上传到了Github,里面也有具体的使用Demo,欢迎大家一起体验、贡献。

HuiDBK/py-tools: 打造 Python 开发常用的工具,让Coding变得更简单 (github.com)

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mfbz.cn/a/570050.html

如若内容造成侵权/违法违规/事实不符,请联系我们进行投诉反馈qq邮箱809451989@qq.com,一经查实,立即删除!

相关文章

Github进行fork后如何与原仓库同步

前言 fork了一个仓库以后怎么同步源仓库的代码? 步骤 1、执行命令 git remote -v 查看你的远程仓库的路径。 以一个实际例子说明, 来源仓库: TheFirstLineOfCode/basaltgit remote -v得到: origin https://github.com/ghmi…

“亚马逊依赖”之下,傲基科技的品牌势能如何提升?

受益于出口政策红利、低人工成本、完善的供应链以及成熟的生产工艺优势,近年来我国家具出口行业迅速发展。 数据显示,我国家具出口规模1995年仅为11.06亿美元,至2023年增至641.96亿美元。随着出口规模持续扩大,相关企业积极走入公…

Java高级阶段面试题库(Redis数据库、MQ消息队列、kafka、SpringBoot + SpringCloud、MySQL、JVMJUC、其它)

文章目录 1. Redis数据库篇(忽略)1.1 简单介绍一下redis1.2 单线程的redis为什么读写速度快?1.3 redis为什么是单线程的?1.4 redis服务器的的内存是多大?1.5 为什么Redis的操作是原子性的,怎么保证原子性的?1.6 你还用过其他的缓存吗?这些…

基于深度学习的车牌识别

如果你认为车牌只是车子的‘名字’,那么是时候让你见识一下,当科技赋予它‘超能力’时会发生什么? 上效果图; 这就是车牌识别的力量,下面是主函数代码: # -*- coding: UTF-8 -*- import argparse import …

使用d3.js画一个BoxPlot

Box Plot 在画Box Plot之前,先来了解下Box Plot是什么? 箱线图(Box Plot)也称盒须图、盒式图或箱型图,是一种用于展示数据分布特征的统计图表。 它由以下几个部分组成: 箱子:表示数据的四分…

阶段性学习汇报 4月19日

目录 一、毕业设计和毕业论文 二、学习python和vue 三、阅读知识图谱 四、下周规划 一、毕业设计和毕业论文 毕业设计后端功能基本实现,但是还有些具体的细节需要优化。前端小程序部分只有个前端页面以及部分交互逻辑,还需进一步完善。在疾病预测这里本…

3d模型合并怎么样不丢材质?---模大狮模型网

在3D设计中,合并模型是常见的操作,它可以帮助设计师将多个单独的模型组合成一个,从而简化场景并提高渲染效率。然而,合并模型时常常会面临一个棘手的问题:如何确保合并后的模型不丢失原有的材质?本文将探讨如何在合并…

电力调度自动化系统由什么构成?

电力调度自动化系统由什么构成? 电力调度自动化系统通过数据采集与传输、数据处理与存储、监视与控制、优化与决策、通信网络和系统应用软件等构成,实现对电力系统的监控、控制和优化。 电力调度自动化系统是一种集成了计算机技术、通信技术、自动化技术…

推荐5款我每次系统重装必装的软件

​ 你电脑中用的最久的软件是哪些?以下是否有你曾经使用过的软件呢?工欲善其事,必先利其器,今天继续分享五款实用的办公软件。 1.素材管理——Billfish ​ Billfish是一款专业的素材管理工具,适用于设计师、摄影师等…

2023中国便利店TOP100公示

转载来源:中国连锁经营协会

unity 录制360全景渲染图

1.打开pakcageManager ,选择packages为 unityRegisty,找到unityRecorder插件下载,点击右下角instant安装,如果插件列表为空,检查是否连接网络,重启Unity 2.打开录制面板 3.add recorder 选择ImageSequence …

风险防不胜防?看YashanDB如何守护你的数据库安全(上篇)

数据库作为信息系统的核心,不仅承载着海量的关键数据,还负责向各类用户提供高效、可靠的信息服务,数据库的安全性显得尤为关键,已成为信息安全体系的重中之重。 什么是数据库安全? 数据库安全是数据安全的一个子集&a…

制造业小企业内部小程序简单设计

也没什么需求,就是看企业内部外来单位就餐还需要打印客饭单拿去食堂给打饭师傅,出门单还需要打印纸质版,车间PDA扫码出问题需要人手动处理,会议室需要OA申请,但是申请前不知道哪些会议室事空的(因为不是每个人都下载OA…

毫米波雷达模块用于海洋生态环境监测的技术方案研究

海洋生态环境是地球上重要的自然资源之一,对其进行监测和保护具有重要意义。毫米波雷达技术作为一种先进的感知技术,在海洋生态环境监测中具有广阔的应用前景。本文将探讨毫米波雷达模块用于海洋生态环境监测的技术方案,包括其原理、应用场景…

el-select下拉框远程搜索且多选时,编辑需要回显的一个简单案例

前端业务开发中不管使用vue2~3,还是react,angular各种前端技术栈,经常会遇到这种业务。一个下拉框Select中,不仅需要需要支持远程模糊搜索,还需要支持多选。并且在编辑时,还能正常把已经多选好的内容回显到…

redis主从复制,无法从redis读取最新的数据

目录 一、场景二、redis连接配置三、排查四、原因五、解决 一、场景 1、redis为主从复制模式 2、采用读写分离(主节点写入,从节点读取) 3、最新数据成功写入主节点,但从节点没有同步最新的数据 二、redis连接配置 #主节点 spr…

Linux——进程基本概念下篇

Linux——进程基本概念下篇 文章目录 Linux——进程基本概念下篇一、环境变量1.1 环境变量的定义1.2 环境变量的相关命令1.3 命令行参数1.4 本地变量和环境变量1.5 常规命令和内建命令 二、进程地址空间2.1 地址空间的概念2.2 页表和MMU2.3 地址空间的作用2.4 地址空间的好处 一…

Docker容器:docker基础

目录 一.docker前言 云计算服务模式 关于 docker 产品 虚拟化产品有哪些? ① 寄居架构 ② 源生架构 虚拟化技术概述 主流虚拟化产品对比 1. VMware系列 2. KVM/OpenStack 3. Xen 4. 其他半/全虚拟化产品 二. docker 的相关知识 1. docker 的概念 doc…

【古琴】倪诗韵古琴雷修系列(形制挺多的)

雷音系列雷修:“修”字取意善、美好的,更有“使之完美”之意。精品桐木或普通杉木制,栗壳色,纯鹿角霜生漆工艺。 方形龙池凤沼。红木配件,龙池上方有“倪诗韵”亲笔签名,凤沼下方,雁足上方居中位…

码头船只出行及配套货柜码放管理系统-毕设

毕业设计说明书 码头船只出行及配套货柜码放 管理系统 码头船只出行及配套货柜码放管理系统 摘要 伴随着全球化的发展,码头的物流和客运增多,码头业务迎来新的高峰。然而码头业务的增加,导致了人员成本和工作量的增多。为了解决这一基本问题&…