1. 为什么需要封装Sqlite3操作
每次直接操作数据库都要重复写连接、执行SQL、提交事务、关闭连接这些代码,实在太麻烦了。我刚开始用Python操作Sqlite3时,经常忘记写conn.commit()导致数据没保存,或者漏掉conn.close()造成资源泄漏。后来发现把这些操作封装成类,不仅能避免低级错误,还能让代码更整洁。
想象一下,就像你去餐厅吃饭,每次都要自己买菜、洗菜、炒菜、洗碗,多累啊。封装好的数据库类就像是专业厨师,你只需要告诉它"我要一份青椒肉丝",它就能把整套流程处理好。
2. 基础封装:数据库连接管理
先来看最基本的封装——连接管理。这个类要处理两件事:建立连接和关闭连接。我习惯把数据库文件路径作为参数传入,这样更灵活:
import sqlite3 from pathlib import Path class Database: def __init__(self, db_path='app.db'): self.db_path = Path(db_path) self.conn = None self.connect() def connect(self): """创建数据库连接""" try: self.conn = sqlite3.connect(self.db_path) print(f"成功连接到数据库 {self.db_path}") except sqlite3.Error as e: print(f"连接数据库失败: {e}") raise def close(self): """关闭数据库连接""" if self.conn: self.conn.close() print("数据库连接已关闭") def __enter__(self): """支持with语句""" return self def __exit__(self, exc_type, exc_val, exc_tb): """退出with块时自动关闭连接""" self.close()这里用了几个实用技巧:
- 使用pathlib处理路径,比直接传字符串更安全
- 添加了连接状态检查,避免重复连接
- 实现了上下文管理器(enter/exit),可以用with语句自动管理连接
测试一下这个基础版:
with Database('test.db') as db: # 这里可以执行数据库操作 pass # with块结束会自动关闭连接3. 完整CRUD封装实战
3.1 增删查改方法实现
现在给Database类添加核心的CRUD方法。为了避免SQL注入,所有SQL语句都使用参数化查询:
class Database: # ... 前面代码不变 ... def execute(self, sql, params=()): """执行SQL语句并自动提交""" try: cursor = self.conn.cursor() cursor.execute(sql, params) self.conn.commit() return cursor except sqlite3.Error as e: self.conn.rollback() print(f"执行SQL失败: {e}") raise def create_table(self, table_name, columns): """ 创建表 :param table_name: 表名 :param columns: 列定义字典 {'id':'INTEGER PRIMARY KEY','name':'TEXT'} """ cols = ", ".join([f"{k} {v}" for k, v in columns.items()]) sql = f"CREATE TABLE IF NOT EXISTS {table_name} ({cols})" self.execute(sql) def insert(self, table_name, data): """ 插入数据 :param data: 字典格式数据 {'name':'张三','age':20} """ columns = ", ".join(data.keys()) placeholders = ", ".join(["?"] * len(data)) sql = f"INSERT INTO {table_name} ({columns}) VALUES ({placeholders})" self.execute(sql, tuple(data.values())) def query(self, table_name, where=None, params=(), fetch_all=True): """ 查询数据 :param where: WHERE子句,例如 "age > ?" :param params: WHERE条件参数 :param fetch_all: 是否获取全部结果 """ sql = f"SELECT * FROM {table_name}" if where: sql += f" WHERE {where}" cursor = self.execute(sql, params) return cursor.fetchall() if fetch_all else cursor.fetchone() def update(self, table_name, data, where=None, params=()): """ 更新数据 :param data: 要更新的字段 {'name':'李四','age':21} :param where: WHERE条件子句 """ set_clause = ", ".join([f"{k}=?" for k in data.keys()]) sql = f"UPDATE {table_name} SET {set_clause}" if where: sql += f" WHERE {where}" self.execute(sql, tuple(data.values()) + tuple(params)) def delete(self, table_name, where=None, params=()): """删除数据""" sql = f"DELETE FROM {table_name}" if where: sql += f" WHERE {where}" self.execute(sql, params)3.2 使用示例
现在用这个封装好的类来操作数据库:
# 初始化 db = Database('mydb.db') # 创建用户表 db.create_table('users', { 'id': 'INTEGER PRIMARY KEY AUTOINCREMENT', 'name': 'TEXT NOT NULL', 'age': 'INTEGER', 'email': 'TEXT UNIQUE' }) # 插入数据 user1 = {'name': '张三', 'age': 25, 'email': 'zhangsan@example.com'} user2 = {'name': '李四', 'age': 30, 'email': 'lisi@example.com'} db.insert('users', user1) db.insert('users', user2) # 查询数据 all_users = db.query('users') print("所有用户:", all_users) adults = db.query('users', 'age > ?', (25,)) print("大于25岁的用户:", adults) # 更新数据 db.update('users', {'age': 26}, 'name = ?', ('张三',)) # 删除数据 db.delete('users', 'age < ?', (28,)) db.close()4. 高级技巧:事务与异常处理
4.1 事务管理
数据库操作最怕中途出错导致数据不一致。比如转账操作需要同时修改两个账户的余额,必须保证要么都成功,要么都失败。这时候就需要事务:
def transfer_money(db, from_id, to_id, amount): """转账函数""" try: # 开始事务 db.execute("BEGIN TRANSACTION") # 扣款 db.execute("UPDATE accounts SET balance = balance - ? WHERE id = ?", (amount, from_id)) # 存款 db.execute("UPDATE accounts SET balance = balance + ? WHERE id = ?", (amount, to_id)) # 提交事务 db.execute("COMMIT") print("转账成功") except sqlite3.Error as e: db.execute("ROLLBACK") print(f"转账失败: {e}") raise4.2 健壮的异常处理
数据库操作可能遇到各种异常:连接失败、SQL语法错误、约束冲突等。好的封装应该能妥善处理这些情况:
class Database: # ... 前面代码不变 ... def execute(self, sql, params=()): """增强版的执行方法""" try: cursor = self.conn.cursor() cursor.execute(sql, params) self.conn.commit() return cursor except sqlite3.IntegrityError as e: self.conn.rollback() print(f"数据完整性错误: {e}") raise DatabaseError("数据已存在或违反约束") from e except sqlite3.OperationalError as e: self.conn.rollback() print(f"操作错误: {e}") raise DatabaseError("数据库操作失败") from e except sqlite3.Error as e: self.conn.rollback() print(f"数据库错误: {e}") raise DatabaseError("未知数据库错误") from e class DatabaseError(Exception): """自定义数据库异常""" pass4.3 重试机制
对于临时性错误(如数据库被锁定),可以添加重试逻辑:
import time def execute_with_retry(self, sql, params=(), max_retries=3, delay=0.1): """带重试的执行方法""" for attempt in range(max_retries): try: return self.execute(sql, params) except sqlite3.OperationalError as e: if "locked" in str(e) and attempt < max_retries - 1: time.sleep(delay) continue raise5. 性能优化技巧
5.1 批量操作
用executemany处理批量插入,比循环插入快10倍以上:
def bulk_insert(self, table_name, rows): """批量插入数据""" if not rows: return columns = ", ".join(rows[0].keys()) placeholders = ", ".join(["?"] * len(rows[0])) sql = f"INSERT INTO {table_name} ({columns}) VALUES ({placeholders})" data = [tuple(row.values()) for row in rows] try: cursor = self.conn.cursor() cursor.executemany(sql, data) self.conn.commit() return cursor.rowcount except sqlite3.Error as e: self.conn.rollback() raise5.2 连接池优化
频繁创建关闭连接很耗资源。对于Web应用,可以使用连接池:
from queue import Queue class ConnectionPool: def __init__(self, db_path, pool_size=5): self.db_path = db_path self.pool = Queue(pool_size) for _ in range(pool_size): conn = sqlite3.connect(db_path) self.pool.put(conn) def get_conn(self): return self.pool.get() def release_conn(self, conn): self.pool.put(conn) def close_all(self): while not self.pool.empty(): conn = self.pool.get() conn.close()5.3 索引优化
为常用查询字段添加索引可以大幅提升查询速度:
def create_index(self, table_name, column_name, unique=False): """创建索引""" index_name = f"idx_{table_name}_{column_name}" unique_clause = "UNIQUE" if unique else "" sql = f"CREATE {unique_clause} INDEX {index_name} ON {table_name}({column_name})" self.execute(sql)6. 实际项目中的应用建议
在实际项目中,我建议这样组织数据库代码:
- 按功能模块分表,每个表对应一个模型类
- 把数据库操作封装在模型类的方法中
- 使用类型注解提高代码可读性
- 编写单元测试验证数据库操作
例如用户模型可以这样实现:
class User: def __init__(self, db): self.db = db def create_table(self): """创建用户表""" self.db.create_table('users', { 'id': 'INTEGER PRIMARY KEY AUTOINCREMENT', 'username': 'TEXT UNIQUE NOT NULL', 'password': 'TEXT NOT NULL', 'created_at': 'TIMESTAMP DEFAULT CURRENT_TIMESTAMP' }) self.db.create_index('users', 'username') def add_user(self, username, password): """添加用户""" try: self.db.insert('users', { 'username': username, 'password': password # 实际项目应该存储哈希值 }) return True except DatabaseError: return False def get_user(self, username): """获取用户信息""" return self.db.query('users', 'username = ?', (username,), fetch_all=False)这种架构让代码更清晰,也更容易维护。当业务逻辑变更时,只需要修改对应的模型类,不会影响其他部分的代码。