专业编程基础技术教程

网站首页 > 基础教程 正文

基于数据库结构生成Web服务之四:MySQL-Python-WebAPI服务

ccvgpt 2025-01-10 11:58:57 基础教程 1 ℃

结合前述设计(详细参考文末参考资料),我已经在Gitee给出一个初步实现,命名为JYGT-CODER。目前可以使用此工具基于MySQL的数据库表结构自动生成GraphQL风格的六类WebAPI(getAll,getByIndex,add,update,replace,delete)。使用这个工具较为简单,总体而言分为三步骤:

  1. 建数据库: 在Gitee项目的src/db/mysql/目录下有MySQL数据库SQL脚本。
  2. 生成代码:使用JYGT-CODER生成WebAPI服务工程代码
  3. 启动服务:启动后就可以使用浏览器访问graphiql

完成之后看下效果,这里对操作步骤进行详细介绍。

基于数据库结构生成Web服务之四:MySQL-Python-WebAPI服务

建数据库

数据库作为此工具的输入项,需要先创建好。如下是使用示例SQL语句创建的数据库db_jygt,可以使用EA设计好数据库后自动生成。

/*
演示用MySQL数据库脚本

## 作者: 修炼者 7457222@qq.com 
## 日期: 2024-12-08
## 描述: 用以生成数据库表的实体类
*/

SET NAMES utf8mb4;
SET FOREIGN_KEY_CHECKS = 0;

-- ----------------------------
-- Table structure for t_button
-- ----------------------------
DROP TABLE IF EXISTS `t_button`;
CREATE TABLE `t_button`  (
  `f_id` int(11) NOT NULL AUTO_INCREMENT COMMENT '主键:唯一标识特定表的一个记录',
  `f_name` varchar(64) CHARACTER SET utf8mb4 COLLATE utf8mb4_bin NOT NULL COMMENT '菜单的名称',
  `f_group_id` int(11) NULL DEFAULT NULL COMMENT '分组id',
  `f_label` varchar(64) CHARACTER SET utf8mb4 COLLATE utf8mb4_bin NULL DEFAULT NULL COMMENT '显示的名称',
  `f_icon` varchar(128) CHARACTER SET utf8mb4 COLLATE utf8mb4_bin NULL DEFAULT NULL COMMENT '显示的图标',
  `f_action` varchar(128) CHARACTER SET utf8mb4 COLLATE utf8mb4_bin NULL DEFAULT NULL COMMENT '点击后的动作',
  `f_paras` varchar(512) CHARACTER SET utf8mb4 COLLATE utf8mb4_bin NULL DEFAULT NULL COMMENT '点击后action携带的参数',
  `f_desc` varchar(512) CHARACTER SET utf8mb4 COLLATE utf8mb4_bin NULL DEFAULT NULL COMMENT '资源的描述',
  `f_state` int(11) NOT NULL DEFAULT 0 COMMENT '状态(可用1/不可用0)',
  `f_create_time` datetime(0) NOT NULL DEFAULT CURRENT_TIMESTAMP,
  `f_modify_time` timestamp(0) NOT NULL DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP(0),
  PRIMARY KEY (`f_id`) USING BTREE,
  INDEX `idx_group_id`(`f_group_id`) USING BTREE
) ENGINE = InnoDB AUTO_INCREMENT = 101 CHARACTER SET = utf8mb4 COLLATE = utf8mb4_bin ROW_FORMAT = Dynamic;

SET FOREIGN_KEY_CHECKS = 1;

假定

生成代码

从https://gitee.com/wapuboy/jygt-coder克隆项目到本地。首先需要安装python环境,本文使用的是python --version Python 3.13.0。进入到项目根目录:

PS D:\work\gitee\jygt-coder> .\bin\start.bat
Active code page: 65001
D:\work\gitee\jygt-coder\
正在安装依赖包...
Requirement already satisfied: Flask==3.1.0 in d:\work\gitee\jygt-coder\.venv\lib\site-packages (from -r D:\work\gitee\jygt-coder\src\requirements.txt (line 1)) (3.1.0)
......
[notice] A new release of pip is available: 24.2 -> 24.3.1
[notice] To update, run: python.exe -m pip install --upgrade pip
'YTHONPATH' is not recognized as an internal or external command,
operable program or batch file.
usage: cmdline.py [-h] [-i DB_HOST] [-p DB_PORT] [-u DB_USER] [-s DB_PASSWORD] [-d DB_DATABASE] [-I API_HOST]
                  [-P API_PORT] [-U API_PATH] [-r TAR_ROOT_DIR] [-v TAR_VER]

JYGT-CODER 命令行工具,用于获取任务描述,并读取数据库表、字段和索引信息.

options:
  -h, --help            show this help message and exit
  -i, --db_host DB_HOST
                        数据库地址
  -p, --db_port DB_PORT
                        数据库端口
  -u, --db_user DB_USER
                        数据库用户名
  -s, --db_password DB_PASSWORD
                        数据库密码
  -d, --db_database DB_DATABASE
                        数据库名称
  -I, --api_host API_HOST
                        生成的WebAPI运行所在的地址
  -P, --api_port API_PORT
                        生成的WebAPI运行所在的端口
  -U, --api_path API_PATH
                        生成的WebAPI的URL根路径
  -r, --tar_root_dir TAR_ROOT_DIR
                        目标代码存放根路径,默认为项目根目录下dat/dist/目录
  -v, --tar_ver TAR_VER
                        目标代码版本号,默认为1.0.0

假如MySQL的db_jygt数据库部署在 127.0.0.1 3306 上,用户是root、口令是root_pwd,可使用如下指令生成WebAPI工程。

.\bin\start.bat -i 127.0.0.1 -p 3306 -u root -s root_pwd -d db_jygt
Active code page: 65001
D:\work\gitee\jygt-coder\
正在安装依赖包...
Requirement already satisfied: Flask==3.1.0 in d:\work\gitee\jygt-coder\.venv\lib\site-packages (from -r D:\work\gitee\jygt-coder\src\requirements.txt (line 1)) (3.1.0)
。。。
[notice] A new release of pip is available: 24.2 -> 24.3.1
[notice] To update, run: python.exe -m pip install --upgrade pip
'YTHONPATH' is not recognized as an internal or external command,
operable program or batch file.

########## JYGT-CODER 准备处理下面任务:##########
<Task(
        db_info=<DBInfo 127.0.0.1:3306/db_jygt>,
        api_info=<WebAPIInfo(host=localhost, port=5000, path=/api>,
        target_project_info=<TargetProjectInfo(dir=D:\work\gitee\jygt-coder\dat\dist\db_jygt,ver=1.0.0)>
)>

1. 加载数据库,并初始化目标工程目录D:\work\gitee\jygt-coder\dat\dist\db_jygt
         << 加载表数量: 20
         << 初始化目标工程目录 D:\work\gitee\jygt-coder\dat\dist\db_jygt
------------------------------------------------------------
db_jygt/
    readme.md
    bin/
        readme.md
    dat/
        readme.md
    doc/
        readme.md
    log/
        readme.md
    src/
        requirements.txt
        public/
            jygt_coder_common.py
            jygt_coder_loger.py
            jygt_coder_utitls.py
        static/
            fetch.min.js
            graphiql.css
            graphiql.min.js
            index.html
            react-dom.min.js
            react.min.js
    tmp/
        readme.md
------------------------------------------------------------

2. 生成代码
         << 生成成功

########## JYGT-CODER 任务处理完毕##########

启动服务

进入D:\work\gitee\jygt-coder\dat\dist\db_jygt目录,执行如下指令,即可启动工程。

 python .\dat\dist\db_jygt\src\main.py
 * Serving Flask app 'main'
 * Debug mode: on
WARNING: This is a development server. Do not use it in a production deployment. Use a production WSGI server instead.
 * Running on http://localhost:5000
Press CTRL+C to quit
 * Restarting with stat
 * Debugger is active!
 * Debugger PIN: 101-595-054

查看WebAPI

使用浏览器访问 http://localhost:5000,可看到如下页面


  1. getAll${table_name}
getAllTUser(page: 0, number: 100) {
    currentPage
    total
    pages
    items {
      fId
      fUid
      fName
      fMotto
      fAuthCode
      fAvartaUrl
      fCreateTime
      fModifyTime
      fUserSourceId
    }
  }
  1. getBy${index_name}Of${table_name}
# UnqSourceAndUid 是一个唯一索引的名称
  getByUnqSourceAndUidOfTUser(paraFUserSourceId:10,paraFUid:"test_f_uidryhvImOADa"){
    fId
    fUid
    fName
    fMotto
    fAuthCode
    fAvartaUrl
    fCreateTime
    fModifyTime
    fUserSourceId
  }
  # IdxSource 是一个普通索引的名称
  getByIdxSourceOfTUser(paraFUserSourceId: 10) {
    fId
    fUid
    fName
    fMotto
    fAuthCode
    fAvartaUrl
    fCreateTime
    fModifyTime
    fUserSourceId
  }
  # 基于主键fId查询
  getByPrimaryOfTUser(paraFId: 12) {
    fId
    fUid
    fName
    fMotto
    fAuthCode
    fAvartaUrl
    fCreateTime
    fModifyTime
    fUserSourceId
  }
  1. add${table_name}
mutation {
  addTUser(
    inTUserGqlObjects: [
      {
        fId: 1012, 
        fUid: "test_f_uidryhvImOADa_10", 
        fName: "test_f_nameDdLDALNScf", 
        fMotto: "test_f_mottomfhfFToPjf", 
        fAuthCode: "test_f_auth_codeLmVHWQfeVK", 
        fAvartaUrl: "test_f_avarta_urliFrsLFuLXA", 
        fCreateTime: "2024-03-10T00:00:00", 
        fModifyTime: "2021-08-08T00:00:00", 
        fUserSourceId: 10
      },
      {
        fId: 3012, 
        fUid: "test_f_uidryhvImOADa_30", 
        fName: "test_f_nameDdLDALNScf", 
        fMotto: "test_f_mottomfhfFToPjf", 
        fAuthCode: "test_f_auth_codeLmVHWQfeVK", 
        fAvartaUrl: "test_f_avarta_urliFrsLFuLXA", 
        fCreateTime: "2024-03-10T00:00:00", 
        fModifyTime: "2021-08-08T00:00:00", 
        fUserSourceId: 10
      }
    ]
  ) {
    res {
      fId
      fName
    }
  }
}
  1. update${table_name}
mutation {
  updateTUser(inTUserGqlObjects: [
    {
      fId: 2012, 
      fUid: "test_f_uidryhvImOADa_2020", 
      fName: "test_f_nameDdLDALNScf_10", fMotto: "test_f_mottomfhfFToPjf", fAuthCode: "test_f_auth_codeLmVHWQfeVK", fAvartaUrl: "test_f_avarta_urliFrsLFuLXA", fCreateTime: "2024-03-10T00:00:00", fModifyTime: "2021-08-08T00:00:00", fUserSourceId: 10}]) {
    res {
      fId
      fName
    }
  }
}
  1. replace${table_name}
mutation {
  replaceTUser(inTUserGqlObjects: [
    {
      fId: 1012, 
      fUid: "test_f_uidryhvImOADa_10", 
      fName: "test_f_nameDdLDALNScf_10", fMotto: "test_f_mottomfhfFToPjf", fAuthCode: "test_f_auth_codeLmVHWQfeVK", fAvartaUrl: "test_f_avarta_urliFrsLFuLXA", fCreateTime: "2024-03-10T00:00:00", fModifyTime: "2021-08-08T00:00:00", fUserSourceId: 10}]) {
    res {
      fId
      fName
    }
  }
}
  1. delete${table_name}
mutation {
  deleteTUser(fIds:[12,26,24]) {
    res {
      fId
      fName
    }
  }
}

测试数据库API

使用pytest测试SQLAlchemy的数据库API。API的文件如下:

#
# 此代码为自动化代码工具【JYGT-CODER】自动生成的文件,请勿手动编辑。              
#                                                               
# 作者: 修炼者 7457222@qq.com                                    
# 日期: 2024-12-16 23:03:18                                        
#
from sqlalchemy import Column, Integer, String, DateTime,Float,Text,BigInteger,Numeric,Date,Time,Boolean,LargeBinary,func
from sqlalchemy.orm import declarative_base
from datetime import datetime

Base = declarative_base()

class TButtonEntity(Base):
    __tablename__ = 't_button'
    f_id = Column(Integer, primary_key=True, nullable=False)
    f_name = Column(String(64),  nullable=False)
    f_group_id = Column(Integer,  nullable=True)
    f_label = Column(String(64),  nullable=True)
    f_icon = Column(String(128),  nullable=True)
    f_action = Column(String(128),  nullable=True)
    f_paras = Column(String(512),  nullable=True)
    f_desc = Column(String(512),  nullable=True)
    f_state = Column(Integer,  nullable=False,  default='0')
    f_create_time = Column(DateTime,  nullable=False,  default=func.now)
    f_modify_time = Column(DateTime,  nullable=False,  default=func.now)

    def __init__(self, **kwargs) -> None:
        super().__init__(**kwargs)

    def __repr__(self):
        attrs = ', '.join(f"{key}={value}" for key, value in vars(self).items() if not key.startswith('_'))
        return f"TButtonEntity({attrs})"

#
# 此代码为自动化代码工具【JYGT-CODER】自动生成的文件,请勿手动编辑。              
#                                                               
# 作者: 修炼者 7457222@qq.com                                    
# 日期: 2024-12-16 23:03:18                                        
#
from connection.MydbPool import mydb_pool_session, mydb_transaction, mydb_copy
from entity.TButtonEntity import TButtonEntity

import re
from sqlalchemy import or_, and_
from public.jygt_coder_loger import get_logger 

logger = get_logger(__name__)

class TButtonService:

    @mydb_pool_session
    def getAll(self, page=0, per_page=10):
        """
        获取所有 TButtonService 记录,支持分页。

        :param page: 当前页码,默认为 1。
        :param per_page: 每页记录数,默认为 10。
        :return: 包含总记录数、总页数、当前页码和记录列表的字典。
        """
        try:
            logger.info(f"{self.__class__.__name__}.getAll")
            items = self.session.query(TButtonEntity
                ).limit(per_page).offset(page * per_page).all()

            total = self.session.query(TButtonEntity).count()

            pages = (total + per_page - 1) // per_page

            return {
                'total': total,
                'pages': pages,
                'current_page': page,
                'items': items
            }
        except Exception as e:
            logger.error(f"Error fetching all TButtonEntity records: {e}")
            raise
    
    @mydb_pool_session
    def _get_by_filters(self, filters: list) -> list:
        """
        根据给定的过滤条件查询 TButtonEntity 记录。

        :param filters: 过滤条件列表。
        :return: 查询结果列表。
        """
        try:
            query = self.session.query(TButtonEntity).filter(and_(*filters))
            return query.all()
        except Exception as e:
            logger.error(f"Error fetching TButtonEntity records with filters {filters}: {e}")
            raise

    @mydb_pool_session
    def getByPrimary(self, f_id):
        """
        根据 表t_button索引 PRIMARY  获取记录。

        输入参数:
                f_id
        
        输出参数:
                TButtonEntity对象或 None
        """
        
        logger.info(f"{self.__class__.__name__}.getByPrimary")
        
        filters = [
                TButtonEntity.f_id == f_id,
            1==1
        ]

        results = self._get_by_filters(filters)
        return results[0] if results else None
    @mydb_pool_session
    def getByIdxGroupId(self, f_group_id):
        """
        根据 表t_button索引 idx_group_id  获取记录。

        输入参数:
                f_group_id
        
        输出参数:
                TButtonEntity列表或空
        """
        
        logger.info(f"{self.__class__.__name__}.getByIdxGroupId")
        
        filters = [
                TButtonEntity.f_group_id == f_group_id,
            1==1
        ]

        return self._get_by_filters(filters)
    @mydb_pool_session
    @mydb_transaction
    def replace(self, records):
        """
        批量替换数据,当数据存在时更新,当数据不存在时增加。

        :param records: 要替换的数据列表。
        :return: 替换后的数据列表。
        """
        if not isinstance(records, list):
            records = [records]
        
        requests = []
        
        for record in records:
            obj = TButtonEntity(**record)

            if record['f_id']:
                objOld = self.getByPrimary(f_id=record['f_id'])
                if objOld is None:
                    self.session.add(obj)
                    requests.append(obj)
                else:
                    mydb_copy(obj, objOld)
                    self.session.merge(objOld)
                    requests.append(objOld)
            else:
                self.session.add(obj)
                requests.append(obj)
        return requests
    @mydb_pool_session
    @mydb_transaction
    def add(self, records):
        """
        批量增加数据,当数据不存在时增加。

        :param records: 要增加的数据列表。
        :return: 增加后的数据列表。
        """
        if not isinstance(records, list):
            records = [records]
        
        requests = []
        
        for record in records:
            obj = TButtonEntity(**record)

            if record['f_id']:
                objOld = self.getByPrimary(f_id=record['f_id'])
                if objOld is None:
                    self.session.add(obj)
                    requests.append(obj)
            else:
                self.session.add(obj)
                requests.append(obj)
        return requests
    @mydb_pool_session
    @mydb_transaction
    def update(self, records):
        """
        批量修改数据,当数据存在时修改。

        :param records: 要修改的数据列表。
        :return: 修改后的数据列表。
        """
        if not isinstance(records, list):
            records = [records]
        
        requests = []
        
        for record in records:
            obj = TButtonEntity(**record)

            if record['f_id']:
                objOld = self.getByPrimary(f_id=record['f_id'])
                if objOld :
                    mydb_copy(obj, objOld)
                    self.session.merge(objOld)
                    requests.append(objOld)            
        return requests
    @mydb_pool_session
    def delete(self, f_ids):
        """
        删除指定主键集合的记录。

        输入参数:
            单值 f_id
            集合 [f_id,...]
        
        输出参数:
            删除的实体对象列表 [TButtonEntity]
        """
        if not isinstance(f_ids, list):
            f_ids = [f_ids]
        
        deleted_items = []
        for f_id in f_ids:
            try:
                oldRecord = self.getByPrimary(f_id=f_id)
                if oldRecord is None:
                    continue

                self.session.delete(oldRecord)
                deleted_items.append(oldRecord)
            
            except Exception as exp:
                logger.error(exp)
                self.session.rollback()
        
        self.session.commit()
        return deleted_items

#
# 此代码为自动化代码工具【JYGT-CODER】自动生成的文件,请勿手动编辑。              
#                                                               
# 作者: 修炼者 7457222@qq.com                                    
# 日期: 2024-12-16 23:03:18                                        
#
import os
import sys
import random

from datetime import datetime, timedelta

start_date = datetime(2020, 1, 1)
end_date = datetime(2024, 12, 31)

def generate_random_date():
    delta = end_date - start_date
    random_days = random.randint(0, delta.days)
    random_date = start_date + timedelta(days=random_days)
    return random_date


# 添加当前文件所在目录到Python路径中
current_dir = os.path.dirname(os.path.abspath(__file__))
sys.path.append(current_dir)

# 添加包的父目录到Python路径中
parent_dir = os.path.dirname(current_dir)
sys.path.append(parent_dir)


import pytest

from service.TButtonService import TButtonService
from entity.TButtonEntity import TButtonEntity


@pytest.fixture
def getObj():
    return TButtonService()

columns = [{'name': 'f_id', 'type': 'Integer', 'gql_type': 'graphene.Int', 'gql_required': 'required=False', 'nullable': 'nullable=False', 'default': 'default=None', 'comment': "'auto_increment'", 'primary_key': 'primary_key=True,'}, {'name': 'f_name', 'type': 'String(64)', 'gql_type': 'graphene.String', 'gql_required': 'required=False', 'nullable': 'nullable=False', 'default': 'default=None', 'comment': "''", 'primary_key': ''}, {'name': 'f_group_id', 'type': 'Integer', 'gql_type': 'graphene.Int', 'gql_required': 'required=True', 'nullable': 'nullable=True', 'default': 'default=None', 'comment': "''", 'primary_key': ''}, {'name': 'f_label', 'type': 'String(64)', 'gql_type': 'graphene.String', 'gql_required': 'required=True', 'nullable': 'nullable=True', 'default': 'default=None', 'comment': "''", 'primary_key': ''}, {'name': 'f_icon', 'type': 'String(128)', 'gql_type': 'graphene.String', 'gql_required': 'required=True', 'nullable': 'nullable=True', 'default': 'default=None', 'comment': "''", 'primary_key': ''}, {'name': 'f_action', 'type': 'String(128)', 'gql_type': 'graphene.String', 'gql_required': 'required=True', 'nullable': 'nullable=True', 'default': 'default=None', 'comment': "''", 'primary_key': ''}, {'name': 'f_paras', 'type': 'String(512)', 'gql_type': 'graphene.String', 'gql_required': 'required=True', 'nullable': 'nullable=True', 'default': 'default=None', 'comment': "''", 'primary_key': ''}, {'name': 'f_desc', 'type': 'String(512)', 'gql_type': 'graphene.String', 'gql_required': 'required=True', 'nullable': 'nullable=True', 'default': 'default=None', 'comment': "''", 'primary_key': ''}, {'name': 'f_state', 'type': 'Integer', 'gql_type': 'graphene.Int', 'gql_required': 'required=False', 'nullable': 'nullable=False', 'default': "default='0'", 'comment': "''", 'primary_key': ''}, {'name': 'f_create_time', 'type': 'DateTime', 'gql_type': 'graphene.String', 'gql_required': 'required=False', 'nullable': 'nullable=False', 'default': 'default=func.now', 'comment': "''", 'primary_key': ''}, {'name': 'f_modify_time', 'type': 'DateTime', 'gql_type': 'graphene.String', 'gql_required': 'required=False', 'nullable': 'nullable=False', 'default': 'default=func.now', 'comment': "'on update CURRENT_TIMESTAMP'", 'primary_key': ''}]

import string

def generate_random_string(length):
    letters = string.ascii_letters  # Includes both uppercase and lowercase letters
    random_string = ''.join(random.choice(letters) for _ in range(length))
    return random_string

def generate_test_data(columns):
    res = []
    while len(res) < 10:
        test_data = {}
        for column in columns:
            col_name = column['name']
            col_type = column['type']
            if col_type == 'Integer':
                test_data[col_name] = random.randint(1, 100)
            elif col_type.startswith('String'):
                test_data[col_name] = f'test_{col_name}{generate_random_string(10)}'
            elif col_type == 'Text':
                test_data[col_name] = f'test_{col_name}{generate_random_string(10)}'
            elif col_type == 'DateTime':
                test_data[col_name] = generate_random_date()
            elif col_type == 'Boolean':
                test_data[col_name] = True
            else:
                test_data[col_name] = None
        res.append(test_data)
    return res

def generate_test_data_primary(columns):
    res = []
    while len(res) < 10:
        test_data = ''
        for column in columns:
            col_name = column['name']
            col_type = column['type']
            col_primary_key = column['primary_key']

            if col_primary_key.startswith('primary_key'):
                if col_type == 'Integer':
                    test_data = random.randint(1, 100)
                elif col_type.startswith('String'):
                    test_data = f'test_{col_name}{generate_random_string(10)}'
                elif col_type == 'Text':
                    test_data = f'test_{col_name}{generate_random_string(10)}'
                elif col_type == 'DateTime':
                    test_data = generate_random_date()
                elif col_type == 'Boolean':
                    test_data = True
                else:
                    test_data = None
        res.append(test_data)
    return res

def test_getAll(getObj):

    res = getObj.getAll()

    if res['total'] == 0:
        return
    
    print(f"total: {res['total']},pages: {res['pages']},current_page: {res['current_page']}")

    for itm in res["items"]:
        print(itm)
        assert isinstance(itm, TButtonEntity)

def test_add_and_get_by(getObj):
    """
    测试 add 方法和 getBy 方法。
    """
    test_data = generate_test_data(columns)

    # 添加数据
    test_data_ed =  getObj.add(test_data)

    if len(test_data_ed) == 0:
        return

    # 获取数据
    primary_key = getattr(test_data_ed[0],'f_id')
    retrieved_data = getObj.getByPrimary(primary_key)
   
    assert retrieved_data is not None

    for column in columns:
        col_name = column['name']
        assert getattr(retrieved_data, col_name) == getattr(test_data_ed[0], col_name)
    

    
def test_update_and_get_by(getObj):
    """
    测试 add 方法和 getBy 方法。
    """
    test_data = generate_test_data(columns)

    # 添加数据
    test_data_ed =  getObj.update(test_data)

    if len(test_data_ed) == 0:
        return

    # 获取数据
    primary_key = getattr(test_data_ed[0],'f_id')
    retrieved_data = getObj.getByPrimary(primary_key)
   
    assert retrieved_data is not None

    for column in columns:
        col_name = column['name']
        assert getattr(retrieved_data, col_name) == getattr(test_data_ed[0], col_name)
    

    


def test_replace_and_get_by(getObj):
    """
    测试 add 方法和 getBy 方法。
    """
    test_data = generate_test_data(columns)

    # 添加数据
    test_data_ed =  getObj.replace(test_data)

    assert len(test_data_ed) == len(test_data)

    # 获取数据
    primary_key = getattr(test_data_ed[0],'f_id')
    retrieved_data = getObj.getByPrimary(primary_key)
   
    assert retrieved_data is not None
    
    for column in columns:
        col_name = column['name']
        assert getattr(retrieved_data, col_name) == getattr(test_data_ed[0], col_name)
    


def test_delete(getObj):
    """
    测试 delete 方法。
    """
    test_data = generate_test_data_primary(columns)

    # 添加数据
    test_data_ed = getObj.delete(test_data)

    for itm in test_data_ed:
        primary_key = getattr(test_data_ed[0],'f_id')
        # 获取删除后的数据
        retrieved_data = getObj.getByPrimary(primary_key)
        assert retrieved_data is None

    

        
pytest --html=./html/report.html
=================================================================== test session starts ===================================================================
platform win32 -- Python 3.13.0, pytest-8.3.3, pluggy-1.5.0
rootdir: D:\work\gitee\jygt-coder\dat\dist\db_jygt
plugins: html-4.1.1, metadata-3.1.1
collected 100 items

src\test\test_TButtonService.py .....                                                                                                                [  5%]
src\test\test_TElementService.py .....                                                                                                               [ 10%]
src\test\test_TFieldContentService.py .....                                                                                                          [ 15%]
src\test\test_TFieldService.py .....                                                                                                                 [ 20%]
src\test\test_TGroupService.py .....                                                                                                                 [ 25%]
src\test\test_TInterfaceService.py .....                                                                                                             [ 30%]
src\test\test_TKyxTemplateService.py .....                                                                                                           [ 35%]
src\test\test_TMenuService.py .....                                                                                                                  [ 40%]
src\test\test_TPageService.py .....                                                                                                                  [ 45%]
src\test\test_TRelationRelationService.py .....                                                                                                      [ 50%]
src\test\test_TRelationService.py .....                                                                                                              [ 55%]
src\test\test_TResourceService.py .....                                                                                                              [ 60%]
src\test\test_TRoleResourceService.py .....                                                                                                          [ 65%]
src\test\test_TRoleService.py .....                                                                                                                  [ 70%]
src\test\test_TUserKydService.py .....                                                                                                               [ 75%]
src\test\test_TUserLogService.py .....                                                                                                               [ 80%]
src\test\test_TUserRelationService.py .....                                                                                                          [ 85%]
src\test\test_TUserRoleService.py .....                                                                                                              [ 90%]
src\test\test_TUserService.py .....                                                                                                                  [ 95%]
src\test\test_TUserSourceService.py .....                                                                                                            [100%]

-------------------------------- Generated html report: file:///D:/work/gitee/jygt-coder/dat/dist/db_jygt/html/report.html --------------------------------
=================================================================== 100 passed in 7.32s ===================================================================
PS D:\work\gitee\jygt-coder\dat\dist\db_jygt>




参考资料

  1. JYGT-CODER: https://gitee.com/wapuboy/jygt-coder
  2. 基于关系数据库结构生成Web服务接口之一:需求与调研
  3. 基于数据库结构生成Web服务接口之二:设计与验证
  4. 基于数据库结构生成Web服务接口之三:MVP
  5. 使用Enterprise Architect设计数据库

Tags:

最近发表
标签列表