Skip to content

PostgreSQL 应用层读写分离

应用层读写分离概述

什么是应用层读写分离

应用层读写分离是指在应用程序代码中实现读写操作的分离,将写入操作发送到主库,将读取操作发送到备库。这种方式相比中间件层读写分离,具有以下特点:

  • 更高的灵活性和可控性
  • 可以根据业务逻辑精细控制读写路由
  • 无需引入额外的中间件
  • 对数据库架构依赖较少

适用场景

  • 应用架构已经稳定,不希望引入新的中间件
  • 需要根据业务逻辑定制读写分离策略
  • 对读写分离的灵活性要求较高
  • 小规模应用,读写分离需求相对简单

实现方式

应用层读写分离主要有两种实现方式:

  1. 基于ORM框架的读写分离

    • 利用ORM框架提供的读写分离功能
    • 配置简单,易于集成
    • 适合使用ORM框架的应用
  2. 基于数据库客户端的读写分离

    • 直接在数据库客户端层面实现读写分离
    • 灵活性更高,适合复杂场景
    • 适合不使用ORM框架的应用

基于ORM框架的读写分离

Django ORM 读写分离

1. 配置DATABASES

在Django项目的settings.py中配置数据库:

python
DATABASES = {
    'default': {
        'ENGINE': 'django.db.backends.postgresql_psycopg2',
        'NAME': 'mydb',
        'USER': 'appuser',
        'PASSWORD': 'apppassword',
        'HOST': '主库IP',
        'PORT': '5432',
    },
    'readonly': {
        'ENGINE': 'django.db.backends.postgresql_psycopg2',
        'NAME': 'mydb',
        'USER': 'appuser',
        'PASSWORD': 'apppassword',
        'HOST': '备库IP',
        'PORT': '5432',
    }
}

# 配置数据库路由
DATABASE_ROUTERS = ['myapp.routers.ReadWriteRouter']

2. 创建数据库路由

创建myapp/routers.py文件,实现读写分离路由:

python
class ReadWriteRouter:
    """数据库读写分离路由"""
    
    def db_for_read(self, model, **hints):
        """读取操作使用readonly数据库"""
        return 'readonly'
    
    def db_for_write(self, model, **hints):
        """写入操作使用default数据库"""
        return 'default'
    
    def allow_relation(self, obj1, obj2, **hints):
        """允许所有关系"""
        return True
    
    def allow_migrate(self, db, app_label, model_name=None, **hints):
        """只在default数据库执行迁移"""
        return db == 'default'

3. 手动指定数据库

在需要时可以手动指定数据库:

python
# 使用主库写入
User.objects.using('default').create(username='test')

# 使用备库读取
User.objects.using('readonly').filter(is_active=True)

Spring Boot JPA 读写分离

1. 配置数据源

application.yml中配置主从数据源:

yaml
spring:
  datasource:
    primary:
      url: jdbc:postgresql://主库IP:5432/mydb
      username: appuser
      password: apppassword
      driver-class-name: org.postgresql.Driver
    secondary:
      url: jdbc:postgresql://备库IP:5432/mydb
      username: appuser
      password: apppassword
      driver-class-name: org.postgresql.Driver

  jpa:
    hibernate:
      ddl-auto: update
    show-sql: true

2. 配置数据源切换

创建数据源配置类:

java
@Configuration
public class DataSourceConfig {
    
    @Bean
    @Primary
    @ConfigurationProperties(prefix = "spring.datasource.primary")
    public DataSource primaryDataSource() {
        return DataSourceBuilder.create().build();
    }
    
    @Bean
    @ConfigurationProperties(prefix = "spring.datasource.secondary")
    public DataSource secondaryDataSource() {
        return DataSourceBuilder.create().build();
    }
    
    @Bean
    public DynamicDataSource dynamicDataSource(@Qualifier("primaryDataSource") DataSource primaryDataSource,
                                             @Qualifier("secondaryDataSource") DataSource secondaryDataSource) {
        Map<Object, Object> targetDataSources = new HashMap<>();
        targetDataSources.put(DataSourceType.PRIMARY.name(), primaryDataSource);
        targetDataSources.put(DataSourceType.SECONDARY.name(), secondaryDataSource);
        
        DynamicDataSource dynamicDataSource = new DynamicDataSource();
        dynamicDataSource.setTargetDataSources(targetDataSources);
        dynamicDataSource.setDefaultTargetDataSource(primaryDataSource);
        
        return dynamicDataSource;
    }
}

3. 实现动态数据源

java
public class DynamicDataSource extends AbstractRoutingDataSource {
    
    @Override
    protected Object determineCurrentLookupKey() {
        return DataSourceContextHolder.getDataSourceType();
    }
}

public class DataSourceContextHolder {
    private static final ThreadLocal<String> CONTEXT_HOLDER = new ThreadLocal<>();
    
    public static void setDataSourceType(String dataSourceType) {
        CONTEXT_HOLDER.set(dataSourceType);
    }
    
    public static String getDataSourceType() {
        return CONTEXT_HOLDER.get();
    }
    
    public static void clearDataSourceType() {
        CONTEXT_HOLDER.remove();
    }
}

4. 实现读写分离注解

java
@Target({ElementType.METHOD, ElementType.TYPE})
@Retention(RetentionPolicy.RUNTIME)
@Documented
public @interface DataSource {
    String value() default DataSourceType.PRIMARY;
}

public class DataSourceType {
    public static final String PRIMARY = "PRIMARY";
    public static final String SECONDARY = "SECONDARY";
}

5. 实现AOP切面

java
@Aspect
@Component
public class DataSourceAspect {
    
    @Before("@annotation(com.example.demo.annotation.DataSource)")
    public void beforeSwitchDataSource(JoinPoint point) {
        MethodSignature signature = (MethodSignature) point.getSignature();
        DataSource dataSource = signature.getMethod().getAnnotation(DataSource.class);
        
        if (dataSource != null) {
            DataSourceContextHolder.setDataSourceType(dataSource.value());
        }
    }
    
    @After("@annotation(com.example.demo.annotation.DataSource)")
    public void afterSwitchDataSource(JoinPoint point) {
        DataSourceContextHolder.clearDataSourceType();
    }
}

6. 使用示例

java
@Service
public class UserService {
    
    @Autowired
    private UserRepository userRepository;
    
    @DataSource(DataSourceType.PRIMARY)
    public User saveUser(User user) {
        return userRepository.save(user);
    }
    
    @DataSource(DataSourceType.SECONDARY)
    public List<User> findAllUsers() {
        return userRepository.findAll();
    }
}

Laravel Eloquent 读写分离

1. 配置数据库

config/database.php中配置主从数据库:

php
'pgsql' => [
    'read' => [
        'host' => ['备库1IP', '备库2IP'],
    ],
    'write' => [
        'host' => ['主库IP'],
    ],
    'driver' => 'pgsql',
    'database' => 'mydb',
    'username' => 'appuser',
    'password' => 'apppassword',
    'charset' => 'utf8',
    'prefix' => '',
    'prefix_indexes' => true,
    'search_path' => 'public',
    'sslmode' => 'prefer',
],

2. 使用示例

php
// 自动路由到主库(写入)
$user = new User;
$user->name = 'test';
$user->save();

// 自动路由到备库(读取)
$users = User::all();

// 手动指定连接
$users = DB::connection('pgsql')->select('select * from users');

基于数据库客户端的读写分离

Python psycopg2 读写分离

1. 实现连接池管理

python
import psycopg2
from psycopg2 import pool

class PostgresConnectionPool:
    def __init__(self):
        # 主库连接池(用于写入)
        self.write_pool = psycopg2.pool.SimpleConnectionPool(
            1, 20,  # minconn, maxconn
            host='主库IP',
            port=5432,
            database='mydb',
            user='appuser',
            password='apppassword'
        )
        
        # 备库连接池(用于读取)
        self.read_pool = psycopg2.pool.SimpleConnectionPool(
            1, 50,  # minconn, maxconn
            host='备库IP',
            port=5432,
            database='mydb',
            user='appuser',
            password='apppassword'
        )
    
    def get_write_connection(self):
        """获取主库连接"""
        return self.write_pool.getconn()
    
    def get_read_connection(self):
        """获取备库连接"""
        return self.read_pool.getconn()
    
    def put_back(self, conn, is_write=True):
        """归还连接到对应连接池"""
        if is_write:
            self.write_pool.putconn(conn)
        else:
            self.read_pool.putconn(conn)
    
    def close_all_connections(self):
        """关闭所有连接"""
        self.write_pool.closeall()
        self.read_pool.closeall()

# 创建全局连接池实例
db_pool = PostgresConnectionPool()

2. 实现读写分离装饰器

python
from functools import wraps

def write_operation(func):
    """写入操作装饰器"""
    @wraps(func)
    def wrapper(*args, **kwargs):
        conn = db_pool.get_write_connection()
        try:
            cursor = conn.cursor()
            result = func(cursor, *args, **kwargs)
            conn.commit()
            return result
        except Exception as e:
            conn.rollback()
            raise e
        finally:
            cursor.close()
            db_pool.put_back(conn, is_write=True)
    return wrapper

def read_operation(func):
    """读取操作装饰器"""
    @wraps(func)
    def wrapper(*args, **kwargs):
        conn = db_pool.get_read_connection()
        try:
            cursor = conn.cursor()
            result = func(cursor, *args, **kwargs)
            return result
        except Exception as e:
            raise e
        finally:
            cursor.close()
            db_pool.put_back(conn, is_write=False)
    return wrapper

3. 使用示例

python
@write_operation
def create_user(cursor, name, email):
    """创建用户(写入操作)"""
    sql = "INSERT INTO users (name, email) VALUES (%s, %s) RETURNING id;"
    cursor.execute(sql, (name, email))
    return cursor.fetchone()[0]

@read_operation
def get_user(cursor, user_id):
    """获取用户(读取操作)"""
    sql = "SELECT * FROM users WHERE id = %s;"
    cursor.execute(sql, (user_id,))
    return cursor.fetchone()

@read_operation
def get_all_users(cursor):
    """获取所有用户(读取操作)"""
    sql = "SELECT * FROM users;"
    cursor.execute(sql)
    return cursor.fetchall()

# 使用示例
user_id = create_user("test", "test@example.com")
user = get_user(user_id)
all_users = get_all_users()

Java JDBC 读写分离

1. 实现连接管理

java
public class PostgreSQLConnectionManager {
    private static final String WRITE_URL = "jdbc:postgresql://主库IP:5432/mydb";
    private static final String READ_URL = "jdbc:postgresql://备库IP:5432/mydb";
    private static final String USER = "appuser";
    private static final String PASSWORD = "apppassword";
    
    // 连接池配置
    private static final int MAX_POOL_SIZE = 100;
    private static final int MIN_IDLE = 10;
    private static final long MAX_WAIT_MILLIS = 30000;
    
    // HikariCP连接池
    private static HikariDataSource writeDataSource;
    private static HikariDataSource readDataSource;
    
    static {
        // 初始化主库连接池
        HikariConfig writeConfig = new HikariConfig();
        writeConfig.setJdbcUrl(WRITE_URL);
        writeConfig.setUsername(USER);
        writeConfig.setPassword(PASSWORD);
        writeConfig.setMaximumPoolSize(MAX_POOL_SIZE);
        writeConfig.setMinimumIdle(MIN_IDLE);
        writeConfig.setMaxWaitMillis(MAX_WAIT_MILLIS);
        writeConfig.setDriverClassName("org.postgresql.Driver");
        writeDataSource = new HikariDataSource(writeConfig);
        
        // 初始化备库连接池
        HikariConfig readConfig = new HikariConfig();
        readConfig.setJdbcUrl(READ_URL);
        readConfig.setUsername(USER);
        readConfig.setPassword(PASSWORD);
        readConfig.setMaximumPoolSize(MAX_POOL_SIZE);
        readConfig.setMinimumIdle(MIN_IDLE);
        readConfig.setMaxWaitMillis(MAX_WAIT_MILLIS);
        readConfig.setDriverClassName("org.postgresql.Driver");
        readDataSource = new HikariDataSource(readConfig);
    }
    
    public static Connection getWriteConnection() throws SQLException {
        return writeDataSource.getConnection();
    }
    
    public static Connection getReadConnection() throws SQLException {
        return readDataSource.getConnection();
    }
    
    public static void close() {
        if (writeDataSource != null) {
            writeDataSource.close();
        }
        if (readDataSource != null) {
            readDataSource.close();
        }
    }
}

2. 使用示例

java
public class UserDao {
    
    public void createUser(User user) throws SQLException {
        String sql = "INSERT INTO users (name, email) VALUES (?, ?)";
        
        try (Connection conn = PostgreSQLConnectionManager.getWriteConnection();
             PreparedStatement pstmt = conn.prepareStatement(sql)) {
            
            pstmt.setString(1, user.getName());
            pstmt.setString(2, user.getEmail());
            pstmt.executeUpdate();
        }
    }
    
    public User getUserById(int id) throws SQLException {
        String sql = "SELECT * FROM users WHERE id = ?";
        User user = null;
        
        try (Connection conn = PostgreSQLConnectionManager.getReadConnection();
             PreparedStatement pstmt = conn.prepareStatement(sql)) {
            
            pstmt.setInt(1, id);
            try (ResultSet rs = pstmt.executeQuery()) {
                if (rs.next()) {
                    user = new User();
                    user.setId(rs.getInt("id"));
                    user.setName(rs.getString("name"));
                    user.setEmail(rs.getString("email"));
                }
            }
        }
        
        return user;
    }
}

读写分离策略设计

静态读写分离策略

  1. 固定路由策略

    • 所有写入操作走主库
    • 所有读取操作走备库
    • 实现简单,适合大部分场景
    • 示例:Django ORM默认读写分离
  2. 权重路由策略

    • 为多个备库设置不同的权重
    • 根据权重分配读取请求
    • 适合备库性能不一致的场景
    • 示例:Laravel Eloquent的read配置

动态读写分离策略

  1. 基于SQL类型的路由

    • 解析SQL语句,根据SQL类型路由
    • SELECT语句走备库,其他语句走主库
    • 适合复杂SQL场景
    • 示例:pgpool-II的读写分离
  2. 基于事务的路由

    • 事务内所有操作走主库
    • 非事务操作根据类型路由
    • 避免事务内数据不一致
    • 示例:Spring Boot的@Transactional注解
  3. 基于业务逻辑的路由

    • 根据业务逻辑动态选择数据源
    • 适合复杂业务场景
    • 示例:电商系统中,订单操作走主库,商品查询走备库
  4. 基于数据一致性的路由

    • 刚写入的数据,一段时间内走主库读取
    • 避免复制延迟导致的数据不一致
    • 示例:使用缓存记录写入时间,短时间内优先读主库

常见问题与解决方案

1. 复制延迟导致的数据不一致

问题现象

  • 写入主库后立即读取备库,获取不到最新数据
  • 应用层出现数据不一致的错误

解决方案

  • 实现基于数据一致性的路由策略
  • 为关键业务设置读取主库的规则
  • 监控复制延迟,超过阈值时自动切换到主库读取
  • 调整PostgreSQL参数,减少复制延迟

2. 备库故障导致读取失败

问题现象

  • 备库宕机,导致读取操作失败
  • 应用层出现连接错误

解决方案

  • 实现备库健康检查机制
  • 配置多个备库,实现自动故障转移
  • 当备库不可用时,自动切换到主库读取
  • 使用连接池,配置连接超时和重试机制

3. 事务内读写分离导致的数据不一致

问题现象

  • 事务内先写入后读取,读取到的数据不是最新的
  • 事务内数据一致性被破坏

解决方案

  • 实现基于事务的路由策略
  • 事务内所有操作走主库
  • 使用@Transactional注解标记事务方法
  • 避免长事务,减少锁定时间

4. 复杂SQL路由错误

问题现象

  • 复杂SQL被错误路由到备库
  • 备库执行失败,导致应用错误

解决方案

  • 优化SQL解析逻辑,准确识别SQL类型
  • 为复杂SQL手动指定数据源
  • 使用注解或配置标记需要走主库的SQL
  • 定期审查SQL路由规则

最佳实践

1. 监控复制延迟

sql
-- 主库查看复制延迟
SELECT 
    application_name AS 备库名称,
    client_addr AS 备库IP,
    pg_wal_lsn_diff(pg_current_wal_lsn(), replay_lsn) / 1024 / 1024 AS 回放延迟_MB,
    now() - replay_lag AS 时间延迟
FROM pg_stat_replication;

2. 实现备库健康检查

python
def check_primary_health():
    """检查主库健康状态"""
    try:
        conn = psycopg2.connect(
            host='主库IP',
            port=5432,
            database='mydb',
            user='appuser',
            password='apppassword',
            connect_timeout=5
        )
        conn.close()
        return True
    except:
        return False

def check_standby_health():
    """检查备库健康状态"""
    try:
        conn = psycopg2.connect(
            host='备库IP',
            port=5432,
            database='mydb',
            user='appuser',
            password='apppassword',
            connect_timeout=5
        )
        conn.close()
        return True
    except:
        return False

3. 实现动态数据源切换

python
class DynamicPostgresConnectionPool:
    def __init__(self):
        self.write_pool = self._create_pool('主库IP')
        self.read_pool = self._create_pool('备库IP')
        self.standby_healthy = True
    
    def _create_pool(self, host):
        return psycopg2.pool.SimpleConnectionPool(
            1, 50,
            host=host,
            port=5432,
            database='mydb',
            user='appuser',
            password='apppassword'
        )
    
    def get_read_connection(self):
        """获取读取连接,备库不可用时自动切换到主库"""
        if self.standby_healthy:
            try:
                return self.read_pool.getconn()
            except:
                self.standby_healthy = False
        # 备库不可用,使用主库
        return self.write_pool.getconn()

4. 优化PostgreSQL参数

# 主库参数
wal_level = replica
max_wal_senders = 10
wal_keep_size = 1GB

# 备库参数
hot_standby = on
max_standby_streaming_delay = 30s
hot_standby_feedback = on

版本差异注意事项

PostgreSQL 9.x

  • 流复制功能相对基础
  • 复制延迟较高
  • 不支持逻辑复制
  • 建议使用基于事务的路由策略

PostgreSQL 10+

  • 支持逻辑复制
  • 复制延迟降低
  • 支持pg_stat_replication中的replay_lag字段
  • 建议使用基于数据一致性的路由策略

PostgreSQL 12+

  • 支持write_lag、flush_lag、replay_lag字段
  • 复制性能进一步提升
  • 支持并行复制
  • 建议使用权重路由策略