外观
PostgreSQL 应用层读写分离
应用层读写分离概述
什么是应用层读写分离
应用层读写分离是指在应用程序代码中实现读写操作的分离,将写入操作发送到主库,将读取操作发送到备库。这种方式相比中间件层读写分离,具有以下特点:
- 更高的灵活性和可控性
- 可以根据业务逻辑精细控制读写路由
- 无需引入额外的中间件
- 对数据库架构依赖较少
适用场景
- 应用架构已经稳定,不希望引入新的中间件
- 需要根据业务逻辑定制读写分离策略
- 对读写分离的灵活性要求较高
- 小规模应用,读写分离需求相对简单
实现方式
应用层读写分离主要有两种实现方式:
基于ORM框架的读写分离
- 利用ORM框架提供的读写分离功能
- 配置简单,易于集成
- 适合使用ORM框架的应用
基于数据库客户端的读写分离
- 直接在数据库客户端层面实现读写分离
- 灵活性更高,适合复杂场景
- 适合不使用ORM框架的应用
基于ORM框架的读写分离
Django ORM 读写分离
1. 配置DATABASES
在Django项目的settings.py中配置数据库:
python
DATABASES = {
'default': {
'ENGINE': 'django.db.backends.postgresql_psycopg2',
'NAME': 'mydb',
'USER': 'appuser',
'PASSWORD': 'apppassword',
'HOST': '主库IP',
'PORT': '5432',
},
'readonly': {
'ENGINE': 'django.db.backends.postgresql_psycopg2',
'NAME': 'mydb',
'USER': 'appuser',
'PASSWORD': 'apppassword',
'HOST': '备库IP',
'PORT': '5432',
}
}
# 配置数据库路由
DATABASE_ROUTERS = ['myapp.routers.ReadWriteRouter']2. 创建数据库路由
创建myapp/routers.py文件,实现读写分离路由:
python
class ReadWriteRouter:
"""数据库读写分离路由"""
def db_for_read(self, model, **hints):
"""读取操作使用readonly数据库"""
return 'readonly'
def db_for_write(self, model, **hints):
"""写入操作使用default数据库"""
return 'default'
def allow_relation(self, obj1, obj2, **hints):
"""允许所有关系"""
return True
def allow_migrate(self, db, app_label, model_name=None, **hints):
"""只在default数据库执行迁移"""
return db == 'default'3. 手动指定数据库
在需要时可以手动指定数据库:
python
# 使用主库写入
User.objects.using('default').create(username='test')
# 使用备库读取
User.objects.using('readonly').filter(is_active=True)Spring Boot JPA 读写分离
1. 配置数据源
在application.yml中配置主从数据源:
yaml
spring:
datasource:
primary:
url: jdbc:postgresql://主库IP:5432/mydb
username: appuser
password: apppassword
driver-class-name: org.postgresql.Driver
secondary:
url: jdbc:postgresql://备库IP:5432/mydb
username: appuser
password: apppassword
driver-class-name: org.postgresql.Driver
jpa:
hibernate:
ddl-auto: update
show-sql: true2. 配置数据源切换
创建数据源配置类:
java
@Configuration
public class DataSourceConfig {
@Bean
@Primary
@ConfigurationProperties(prefix = "spring.datasource.primary")
public DataSource primaryDataSource() {
return DataSourceBuilder.create().build();
}
@Bean
@ConfigurationProperties(prefix = "spring.datasource.secondary")
public DataSource secondaryDataSource() {
return DataSourceBuilder.create().build();
}
@Bean
public DynamicDataSource dynamicDataSource(@Qualifier("primaryDataSource") DataSource primaryDataSource,
@Qualifier("secondaryDataSource") DataSource secondaryDataSource) {
Map<Object, Object> targetDataSources = new HashMap<>();
targetDataSources.put(DataSourceType.PRIMARY.name(), primaryDataSource);
targetDataSources.put(DataSourceType.SECONDARY.name(), secondaryDataSource);
DynamicDataSource dynamicDataSource = new DynamicDataSource();
dynamicDataSource.setTargetDataSources(targetDataSources);
dynamicDataSource.setDefaultTargetDataSource(primaryDataSource);
return dynamicDataSource;
}
}3. 实现动态数据源
java
public class DynamicDataSource extends AbstractRoutingDataSource {
@Override
protected Object determineCurrentLookupKey() {
return DataSourceContextHolder.getDataSourceType();
}
}
public class DataSourceContextHolder {
private static final ThreadLocal<String> CONTEXT_HOLDER = new ThreadLocal<>();
public static void setDataSourceType(String dataSourceType) {
CONTEXT_HOLDER.set(dataSourceType);
}
public static String getDataSourceType() {
return CONTEXT_HOLDER.get();
}
public static void clearDataSourceType() {
CONTEXT_HOLDER.remove();
}
}4. 实现读写分离注解
java
@Target({ElementType.METHOD, ElementType.TYPE})
@Retention(RetentionPolicy.RUNTIME)
@Documented
public @interface DataSource {
String value() default DataSourceType.PRIMARY;
}
public class DataSourceType {
public static final String PRIMARY = "PRIMARY";
public static final String SECONDARY = "SECONDARY";
}5. 实现AOP切面
java
@Aspect
@Component
public class DataSourceAspect {
@Before("@annotation(com.example.demo.annotation.DataSource)")
public void beforeSwitchDataSource(JoinPoint point) {
MethodSignature signature = (MethodSignature) point.getSignature();
DataSource dataSource = signature.getMethod().getAnnotation(DataSource.class);
if (dataSource != null) {
DataSourceContextHolder.setDataSourceType(dataSource.value());
}
}
@After("@annotation(com.example.demo.annotation.DataSource)")
public void afterSwitchDataSource(JoinPoint point) {
DataSourceContextHolder.clearDataSourceType();
}
}6. 使用示例
java
@Service
public class UserService {
@Autowired
private UserRepository userRepository;
@DataSource(DataSourceType.PRIMARY)
public User saveUser(User user) {
return userRepository.save(user);
}
@DataSource(DataSourceType.SECONDARY)
public List<User> findAllUsers() {
return userRepository.findAll();
}
}Laravel Eloquent 读写分离
1. 配置数据库
在config/database.php中配置主从数据库:
php
'pgsql' => [
'read' => [
'host' => ['备库1IP', '备库2IP'],
],
'write' => [
'host' => ['主库IP'],
],
'driver' => 'pgsql',
'database' => 'mydb',
'username' => 'appuser',
'password' => 'apppassword',
'charset' => 'utf8',
'prefix' => '',
'prefix_indexes' => true,
'search_path' => 'public',
'sslmode' => 'prefer',
],2. 使用示例
php
// 自动路由到主库(写入)
$user = new User;
$user->name = 'test';
$user->save();
// 自动路由到备库(读取)
$users = User::all();
// 手动指定连接
$users = DB::connection('pgsql')->select('select * from users');基于数据库客户端的读写分离
Python psycopg2 读写分离
1. 实现连接池管理
python
import psycopg2
from psycopg2 import pool
class PostgresConnectionPool:
def __init__(self):
# 主库连接池(用于写入)
self.write_pool = psycopg2.pool.SimpleConnectionPool(
1, 20, # minconn, maxconn
host='主库IP',
port=5432,
database='mydb',
user='appuser',
password='apppassword'
)
# 备库连接池(用于读取)
self.read_pool = psycopg2.pool.SimpleConnectionPool(
1, 50, # minconn, maxconn
host='备库IP',
port=5432,
database='mydb',
user='appuser',
password='apppassword'
)
def get_write_connection(self):
"""获取主库连接"""
return self.write_pool.getconn()
def get_read_connection(self):
"""获取备库连接"""
return self.read_pool.getconn()
def put_back(self, conn, is_write=True):
"""归还连接到对应连接池"""
if is_write:
self.write_pool.putconn(conn)
else:
self.read_pool.putconn(conn)
def close_all_connections(self):
"""关闭所有连接"""
self.write_pool.closeall()
self.read_pool.closeall()
# 创建全局连接池实例
db_pool = PostgresConnectionPool()2. 实现读写分离装饰器
python
from functools import wraps
def write_operation(func):
"""写入操作装饰器"""
@wraps(func)
def wrapper(*args, **kwargs):
conn = db_pool.get_write_connection()
try:
cursor = conn.cursor()
result = func(cursor, *args, **kwargs)
conn.commit()
return result
except Exception as e:
conn.rollback()
raise e
finally:
cursor.close()
db_pool.put_back(conn, is_write=True)
return wrapper
def read_operation(func):
"""读取操作装饰器"""
@wraps(func)
def wrapper(*args, **kwargs):
conn = db_pool.get_read_connection()
try:
cursor = conn.cursor()
result = func(cursor, *args, **kwargs)
return result
except Exception as e:
raise e
finally:
cursor.close()
db_pool.put_back(conn, is_write=False)
return wrapper3. 使用示例
python
@write_operation
def create_user(cursor, name, email):
"""创建用户(写入操作)"""
sql = "INSERT INTO users (name, email) VALUES (%s, %s) RETURNING id;"
cursor.execute(sql, (name, email))
return cursor.fetchone()[0]
@read_operation
def get_user(cursor, user_id):
"""获取用户(读取操作)"""
sql = "SELECT * FROM users WHERE id = %s;"
cursor.execute(sql, (user_id,))
return cursor.fetchone()
@read_operation
def get_all_users(cursor):
"""获取所有用户(读取操作)"""
sql = "SELECT * FROM users;"
cursor.execute(sql)
return cursor.fetchall()
# 使用示例
user_id = create_user("test", "test@example.com")
user = get_user(user_id)
all_users = get_all_users()Java JDBC 读写分离
1. 实现连接管理
java
public class PostgreSQLConnectionManager {
private static final String WRITE_URL = "jdbc:postgresql://主库IP:5432/mydb";
private static final String READ_URL = "jdbc:postgresql://备库IP:5432/mydb";
private static final String USER = "appuser";
private static final String PASSWORD = "apppassword";
// 连接池配置
private static final int MAX_POOL_SIZE = 100;
private static final int MIN_IDLE = 10;
private static final long MAX_WAIT_MILLIS = 30000;
// HikariCP连接池
private static HikariDataSource writeDataSource;
private static HikariDataSource readDataSource;
static {
// 初始化主库连接池
HikariConfig writeConfig = new HikariConfig();
writeConfig.setJdbcUrl(WRITE_URL);
writeConfig.setUsername(USER);
writeConfig.setPassword(PASSWORD);
writeConfig.setMaximumPoolSize(MAX_POOL_SIZE);
writeConfig.setMinimumIdle(MIN_IDLE);
writeConfig.setMaxWaitMillis(MAX_WAIT_MILLIS);
writeConfig.setDriverClassName("org.postgresql.Driver");
writeDataSource = new HikariDataSource(writeConfig);
// 初始化备库连接池
HikariConfig readConfig = new HikariConfig();
readConfig.setJdbcUrl(READ_URL);
readConfig.setUsername(USER);
readConfig.setPassword(PASSWORD);
readConfig.setMaximumPoolSize(MAX_POOL_SIZE);
readConfig.setMinimumIdle(MIN_IDLE);
readConfig.setMaxWaitMillis(MAX_WAIT_MILLIS);
readConfig.setDriverClassName("org.postgresql.Driver");
readDataSource = new HikariDataSource(readConfig);
}
public static Connection getWriteConnection() throws SQLException {
return writeDataSource.getConnection();
}
public static Connection getReadConnection() throws SQLException {
return readDataSource.getConnection();
}
public static void close() {
if (writeDataSource != null) {
writeDataSource.close();
}
if (readDataSource != null) {
readDataSource.close();
}
}
}2. 使用示例
java
public class UserDao {
public void createUser(User user) throws SQLException {
String sql = "INSERT INTO users (name, email) VALUES (?, ?)";
try (Connection conn = PostgreSQLConnectionManager.getWriteConnection();
PreparedStatement pstmt = conn.prepareStatement(sql)) {
pstmt.setString(1, user.getName());
pstmt.setString(2, user.getEmail());
pstmt.executeUpdate();
}
}
public User getUserById(int id) throws SQLException {
String sql = "SELECT * FROM users WHERE id = ?";
User user = null;
try (Connection conn = PostgreSQLConnectionManager.getReadConnection();
PreparedStatement pstmt = conn.prepareStatement(sql)) {
pstmt.setInt(1, id);
try (ResultSet rs = pstmt.executeQuery()) {
if (rs.next()) {
user = new User();
user.setId(rs.getInt("id"));
user.setName(rs.getString("name"));
user.setEmail(rs.getString("email"));
}
}
}
return user;
}
}读写分离策略设计
静态读写分离策略
固定路由策略
- 所有写入操作走主库
- 所有读取操作走备库
- 实现简单,适合大部分场景
- 示例:Django ORM默认读写分离
权重路由策略
- 为多个备库设置不同的权重
- 根据权重分配读取请求
- 适合备库性能不一致的场景
- 示例:Laravel Eloquent的read配置
动态读写分离策略
基于SQL类型的路由
- 解析SQL语句,根据SQL类型路由
- SELECT语句走备库,其他语句走主库
- 适合复杂SQL场景
- 示例:pgpool-II的读写分离
基于事务的路由
- 事务内所有操作走主库
- 非事务操作根据类型路由
- 避免事务内数据不一致
- 示例:Spring Boot的@Transactional注解
基于业务逻辑的路由
- 根据业务逻辑动态选择数据源
- 适合复杂业务场景
- 示例:电商系统中,订单操作走主库,商品查询走备库
基于数据一致性的路由
- 刚写入的数据,一段时间内走主库读取
- 避免复制延迟导致的数据不一致
- 示例:使用缓存记录写入时间,短时间内优先读主库
常见问题与解决方案
1. 复制延迟导致的数据不一致
问题现象:
- 写入主库后立即读取备库,获取不到最新数据
- 应用层出现数据不一致的错误
解决方案:
- 实现基于数据一致性的路由策略
- 为关键业务设置读取主库的规则
- 监控复制延迟,超过阈值时自动切换到主库读取
- 调整PostgreSQL参数,减少复制延迟
2. 备库故障导致读取失败
问题现象:
- 备库宕机,导致读取操作失败
- 应用层出现连接错误
解决方案:
- 实现备库健康检查机制
- 配置多个备库,实现自动故障转移
- 当备库不可用时,自动切换到主库读取
- 使用连接池,配置连接超时和重试机制
3. 事务内读写分离导致的数据不一致
问题现象:
- 事务内先写入后读取,读取到的数据不是最新的
- 事务内数据一致性被破坏
解决方案:
- 实现基于事务的路由策略
- 事务内所有操作走主库
- 使用@Transactional注解标记事务方法
- 避免长事务,减少锁定时间
4. 复杂SQL路由错误
问题现象:
- 复杂SQL被错误路由到备库
- 备库执行失败,导致应用错误
解决方案:
- 优化SQL解析逻辑,准确识别SQL类型
- 为复杂SQL手动指定数据源
- 使用注解或配置标记需要走主库的SQL
- 定期审查SQL路由规则
最佳实践
1. 监控复制延迟
sql
-- 主库查看复制延迟
SELECT
application_name AS 备库名称,
client_addr AS 备库IP,
pg_wal_lsn_diff(pg_current_wal_lsn(), replay_lsn) / 1024 / 1024 AS 回放延迟_MB,
now() - replay_lag AS 时间延迟
FROM pg_stat_replication;2. 实现备库健康检查
python
def check_primary_health():
"""检查主库健康状态"""
try:
conn = psycopg2.connect(
host='主库IP',
port=5432,
database='mydb',
user='appuser',
password='apppassword',
connect_timeout=5
)
conn.close()
return True
except:
return False
def check_standby_health():
"""检查备库健康状态"""
try:
conn = psycopg2.connect(
host='备库IP',
port=5432,
database='mydb',
user='appuser',
password='apppassword',
connect_timeout=5
)
conn.close()
return True
except:
return False3. 实现动态数据源切换
python
class DynamicPostgresConnectionPool:
def __init__(self):
self.write_pool = self._create_pool('主库IP')
self.read_pool = self._create_pool('备库IP')
self.standby_healthy = True
def _create_pool(self, host):
return psycopg2.pool.SimpleConnectionPool(
1, 50,
host=host,
port=5432,
database='mydb',
user='appuser',
password='apppassword'
)
def get_read_connection(self):
"""获取读取连接,备库不可用时自动切换到主库"""
if self.standby_healthy:
try:
return self.read_pool.getconn()
except:
self.standby_healthy = False
# 备库不可用,使用主库
return self.write_pool.getconn()4. 优化PostgreSQL参数
# 主库参数
wal_level = replica
max_wal_senders = 10
wal_keep_size = 1GB
# 备库参数
hot_standby = on
max_standby_streaming_delay = 30s
hot_standby_feedback = on版本差异注意事项
PostgreSQL 9.x
- 流复制功能相对基础
- 复制延迟较高
- 不支持逻辑复制
- 建议使用基于事务的路由策略
PostgreSQL 10+
- 支持逻辑复制
- 复制延迟降低
- 支持pg_stat_replication中的replay_lag字段
- 建议使用基于数据一致性的路由策略
PostgreSQL 12+
- 支持write_lag、flush_lag、replay_lag字段
- 复制性能进一步提升
- 支持并行复制
- 建议使用权重路由策略
