动手实现一个Python版本的MysqlClient

背景

在实际项目中我们会在项目的各个角落进行数据库访问操作,显然我们需要将对数据库的访问做一定的封装,在统一管理的基础上也可以做一些额外的优化。
所以我们打算实现一个叫做MysqlClient的类对数据库访问做统一管理,我们期待它的作用有:

  • 统一管理数据库访问凭证
  • queryexecute两类操作进行封装
  • 预处理sql以防 SQL注入
  • 引入数据库连接池
  • 合适的错误检查与记录
  • 以及如何正确使用

在百度上随便一搜就能搜到类似的实现,不过他们大部分都是只实现了对queryexecute的封装,少部分加上了预处理sql,我希望提供一个更完整的MysqlClient实现。

我最初也是在百度上复制了一个封装了query、execute两个操作的MysqlClient,后来被公司安全部门提示有sql注入风险,所幸将平常遇到的对数据库访问对象的需求一次考虑清楚设计一个比较完善的实现。

上代码

Python 3.7,遵循 PEP8 代码风格

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
# encoding: utf-8
"""
@desc: MySQL client. Manage the access evidence, wrap the common action, improve the performance, keep the security.
"""
import MySQLdb
import glog
from DBUtils.PooledDB import PooledDB
from sqlalchemy import create_engine
from sqlalchemy.orm import sessionmaker

# database configuration
__DATABASE_HOST = "127.0.0.1"
__DATABASE_PORT = "3306"
__DATABASE_USER = "root"
__DATABASE_PASSWORD = "password"
__DATABASE_NAME = "demo"

# database connection pool configuration
__NUM_DEFAULT_CONNECTION = 10


def get_mysql_connection_pool():
"""
Database connection to the server using the connection pool.
:return: database connection
"""
db_host = __DATABASE_HOST
db_port = __DATABASE_PORT
db_user = __DATABASE_USER
db_password = __DATABASE_PASSWORD
db_name = __DATABASE_NAME

num_connection = __NUM_DEFAULT_CONNECTION

connection_pool = PooledDB(MySQLdb, num_connection,
host=db_host, port=db_port, user=db_user, passwd=db_password, db=db_name)

return connection_pool


def get_session():
"""
Used by ORM framework.
"""
db_host = __DATABASE_HOST
db_port = __DATABASE_PORT
db_user = __DATABASE_USER
db_password = __DATABASE_PASSWORD
db_name = __DATABASE_NAME

engine = create_engine('mysql+mysqlconnector://'
+ db_user + ':' + db_password + '@' + db_host + ':' + db_port + '/' + db_name)
da_session = sessionmaker(bind=engine)
session = da_session()
return session


def watch_prepared_statement(sql, params):
"""
When we use prepared sql statement, the program doesn't the real sql neither in the python program or
database process. This function will parse the expected sql statement for debugging.
"""
expected_sql = sql % tuple(params)
return expected_sql


class MysqlClient(object):
"""
Package actions about the database.
"""
def __init__(self):
self.connection_pool = get_mysql_connection_pool()

def query(self, sql, params=None):
"""
Query action.
:param sql: being executed sql statement
:param params: sql param, to avoid the sql injection
:return: Query Result
:rtype: list
"""
connection = self.connection_pool.connection()
cursor = connection.cursor()

if params is None:
params = []
try:
cursor.execute(str(sql), params)
result = cursor.fetchall()
return result
except Exception as e:
expected_sql = watch_prepared_statement(sql, params)
glog.error("[" + str(e) + "]" + expected_sql)
result = None
finally:
# release the connection to the pooled
cursor.close()
connection.close()

return result

def update(self, sql, params=None):
"""
Update action.
:param sql: being executed sql statement
:param params: sql param, to avoid the sql injection
:return: affected rows count and the last row id
:rtype: tuple
"""
connection = self.connection_pool.connection()
cursor = connection.cursor()

if params is None:
params = []
try:
cursor.execute(str(sql), params)
row_count = cursor.rowcount
row_id = cursor.lastrowid
connection.commit()
result = row_count, row_id
except Exception as e:
expected_sql = watch_prepared_statement(sql, params)
glog.error("[" + str(e) + "]" + expected_sql)
result = None
finally:
# release the connection to the pooled
cursor.close()
connection.close()

return result

def get_connection(self):
"""
Other actions. Return the connection to finish other operations, such as executemany and so on.
"""
return self.connection_pool.connection()


__mysql_client = MysqlClient()


def get_mysql_client():
"""
Initialize the singleton object.
"""
return __mysql_client

总结

  • 外部访问时通过调用get_mysql_client()函数来获得单例的 MysqlClient。

    实现单例模式最简单的方法:在模块内定义变量,模块被import时被初始化

  • watch_prepared_statement()函数用于观察预处理后的sql语句

    通俗地说,预处理就是把 % 变为 ,

  • get_session()是相对独立的一部分,在使用如 SQLAlchemy 这样的ORM框架时会用到。

  • 提供了get_connection()用于完成不常用的数据库操作,当然如果在项目层面常用也可以增加额外的封装方法。
  • 数据库连接池部分还可以对cache,连接数做额外的配置,这个可以根据项目实际需求做改动。
  • 对配置信息部分,还可以引入ConfigParser对其进行统一管理在配置文件。