Skip to content

PostgreSQL 逻辑复制

逻辑复制是PostgreSQL 10引入的功能,基于发布-订阅模型,支持跨版本复制、选择性复制和表级复制。

逻辑复制前提条件

1. 数据库配置

发布者和订阅者都需要配置以下参数:

bash
# 启用逻辑复制
echo "wal_level = logical" >> postgresql.conf
# 设置复制槽保留时间
echo "max_replication_slots = 10" >> postgresql.conf
# 设置逻辑复制工作者数量
echo "max_logical_replication_workers = 4" >> postgresql.conf
# 设置每个订阅的工作者数量
echo "max_worker_processes = 10" >> postgresql.conf

# 重新加载配置(发布者需要重启数据库)
pg_ctl reload -D /var/lib/postgresql/14/main

2. 复制用户权限

创建具有复制权限的用户:

sql
-- 创建复制用户
CREATE USER replicator WITH REPLICATION LOGIN PASSWORD 'password';

-- 在pg_hba.conf中添加复制权限
echo "host replication replicator 192.168.1.0/24 md5" >> pg_hba.conf
echo "host all all 192.168.1.0/24 md5" >> pg_hba.conf

3. 表要求

  • 表必须有主键或唯一约束
  • 发布者和订阅者的表结构必须兼容
  • 订阅者的表必须已存在(或使用copy_data = true自动创建)

发布配置

1. 创建基本发布

sql
-- 创建发布(指定表)
CREATE PUBLICATION my_publication FOR TABLE users, orders;

-- 创建发布(所有表)
CREATE PUBLICATION all_tables FOR ALL TABLES;

-- 创建发布(指定操作类型)
CREATE PUBLICATION my_publication 
FOR TABLE users, orders 
WITH (publish = 'insert, update, delete');

2. 高级发布配置

sql
-- 创建发布(仅包含特定列)
-- 注意:PostgreSQL不直接支持列级复制,需要通过视图实现
CREATE VIEW users_view AS SELECT id, name, email FROM users;
CREATE PUBLICATION users_publication FOR TABLE users_view;

-- 创建发布(包含TRUNCATE操作)
CREATE PUBLICATION my_publication 
FOR TABLE users, orders 
WITH (publish = 'insert, update, delete, truncate');

3. 管理发布

sql
-- 添加表到发布
ALTER PUBLICATION my_publication ADD TABLE products;

-- 从发布中移除表
ALTER PUBLICATION my_publication DROP TABLE products;

-- 修改发布配置
ALTER PUBLICATION my_publication SET (publish = 'insert, update, delete');

-- 查看发布
SELECT * FROM pg_publication;

-- 查看发布包含的表
SELECT * FROM pg_publication_tables;

-- 删除发布
DROP PUBLICATION my_publication;

订阅配置

1. 创建基本订阅

sql
-- 创建订阅
CREATE SUBSCRIPTION my_subscription
CONNECTION 'host=pub_host port=5432 user=replicator dbname=mydb password=password'
PUBLICATION my_publication;

-- 创建订阅(不自动复制数据)
CREATE SUBSCRIPTION my_subscription
CONNECTION 'host=pub_host port=5432 user=replicator dbname=mydb password=password'
PUBLICATION my_publication
WITH (copy_data = false);

2. 高级订阅配置

sql
-- 创建订阅(指定复制槽名称)
CREATE SUBSCRIPTION my_subscription
CONNECTION 'host=pub_host port=5432 user=replicator dbname=mydb password=password'
PUBLICATION my_publication
WITH (
  copy_data = true,
  create_slot = true,
  slot_name = 'my_slot',
  enabled = true,
  synchronous_commit = 'on'
);

-- 创建订阅(使用已存在的复制槽)
CREATE SUBSCRIPTION my_subscription
CONNECTION 'host=pub_host port=5432 user=replicator dbname=mydb password=password'
PUBLICATION my_publication
WITH (
  copy_data = true,
  create_slot = false,
  slot_name = 'existing_slot',
  enabled = true
);

3. 管理订阅

sql
-- 禁用订阅
ALTER SUBSCRIPTION my_subscription DISABLE;

-- 启用订阅
ALTER SUBSCRIPTION my_subscription ENABLE;

-- 刷新发布(更新订阅的表列表)
ALTER SUBSCRIPTION my_subscription REFRESH PUBLICATION;

-- 修改订阅连接信息
ALTER SUBSCRIPTION my_subscription CONNECTION 'host=new_pub_host port=5432 user=replicator dbname=mydb password=password';

-- 修改订阅配置参数
ALTER SUBSCRIPTION my_subscription SET (synchronous_commit = 'off');

-- 删除订阅
DROP SUBSCRIPTION my_subscription;

监控逻辑复制

1. 查看复制状态

sql
-- 查看订阅状态
SELECT 
  subname, 
  subdbid, 
  subenabled, 
  subconninfo, 
  subslotname 
FROM pg_subscription;

-- 查看订阅统计信息
SELECT 
  subname, 
  received_lsn, 
  last_msg_send_time, 
  last_msg_receipt_time, 
  latest_end_lsn, 
  latest_end_time 
FROM pg_stat_subscription;

2. 监控复制延迟

sql
-- 查看复制延迟
SELECT 
  now() - last_msg_receipt_time AS lag_time,
  (latest_end_lsn - received_lsn) AS lag_bytes
FROM pg_stat_subscription;

-- 查看复制槽状态
SELECT 
  slot_name, 
  plugin, 
  slot_type, 
  active, 
  restart_lsn, 
  confirmed_flush_lsn,
  pg_wal_lsn_diff(pg_current_wal_lsn(), confirmed_flush_lsn) AS lag_bytes
FROM pg_replication_slots;

3. 查看复制进程

sql
-- 查看逻辑复制工作者进程
SELECT 
  pid, 
  backend_type, 
  usename, 
  application_name, 
  client_addr, 
  state, 
  query
FROM pg_stat_activity 
WHERE backend_type LIKE '%logical%' OR application_name LIKE '%subscription%';

冲突处理

1. 查看冲突

sql
-- 查看冲突统计
SELECT * FROM pg_stat_subscription_conflicts;

-- 查看详细冲突日志
SELECT * FROM pg_log WHERE message LIKE '%conflict%';

2. 冲突类型

常见的冲突类型包括:

  • 唯一约束冲突(duplicate key violation)
  • 外键约束冲突(foreign key violation)
  • 数据类型不兼容(data type mismatch)

3. 冲突处理方法

sql
-- 方法1:跳过冲突事务(不推荐,可能导致数据不一致)
ALTER SUBSCRIPTION my_subscription SET (skip_errors = true);

-- 方法2:禁用订阅,手动解决冲突后重新启用
ALTER SUBSCRIPTION my_subscription DISABLE;
-- 手动修复冲突数据
UPDATE users SET name = 'new_name' WHERE id = 1;
-- 重新启用订阅
ALTER SUBSCRIPTION my_subscription ENABLE;

-- 方法3:使用初始数据复制重新同步
ALTER SUBSCRIPTION my_subscription DISABLE;
DROP SUBSCRIPTION my_subscription;
-- 重新创建订阅,使用copy_data = true重新同步数据
CREATE SUBSCRIPTION my_subscription
CONNECTION 'host=pub_host port=5432 user=replicator dbname=mydb password=password'
PUBLICATION my_publication
WITH (copy_data = true, create_slot = true);

4. 冲突预防

  • 避免在订阅者上直接写入数据
  • 使用只读模式的订阅者
  • 设计合理的主键和唯一约束
  • 确保发布者和订阅者的表结构一致
  • 使用合适的冲突检测机制

逻辑复制维护

1. 复制槽管理

sql
-- 查看复制槽
SELECT * FROM pg_replication_slots;

-- 删除不再使用的复制槽
SELECT pg_drop_replication_slot('unused_slot');

-- 检查复制槽的WAL消耗
SELECT 
  slot_name, 
  pg_wal_lsn_diff(pg_current_wal_lsn(), confirmed_flush_lsn) AS wal_used_bytes
FROM pg_replication_slots;

2. 清理过期WAL

sql
-- 查看WAL保留情况
SELECT 
  name, 
  setting, 
  unit 
FROM pg_settings 
WHERE name LIKE '%wal%' OR name LIKE '%checkpoint%';

-- 手动清理WAL(仅在必要时)
SELECT pg_switch_wal();
SELECT pg_archivecleanup('/var/lib/postgresql/14/main/pg_wal', pg_current_wal_lsn());

3. 重新同步数据

sql
-- 方法1:使用copy_data重新同步
ALTER SUBSCRIPTION my_subscription DISABLE;
ALTER SUBSCRIPTION my_subscription SET (slot_name = NONE);
ALTER SUBSCRIPTION my_subscription REFRESH PUBLICATION WITH (copy_data = true);
ALTER SUBSCRIPTION my_subscription ENABLE;

-- 方法2:使用pg_dump/pg_restore重新同步
pg_dump -h pub_host -U replicator -d mydb -t users -t orders > dump.sql
psql -h sub_host -U replicator -d mydb < dump.sql

逻辑复制最佳实践

1. 配置最佳实践

  • 合理设置wal_level = logical
  • 根据订阅数量调整max_replication_slotsmax_logical_replication_workers
  • 使用合适的synchronous_commit设置(生产环境建议使用onremote_write
  • 定期监控复制延迟,设置延迟告警

2. 性能优化

  • 避免在单个发布中包含过多表
  • 使用多个订阅并行复制不同的表组
  • 调整max_logical_replication_workers提高并行复制性能
  • 考虑使用分区表减少单个表的复制压力

3. 高可用配置

  • 结合物理复制实现级联复制
  • 使用多个订阅者实现多活架构
  • 配置自动故障转移机制
  • 定期备份复制槽信息

4. 安全考虑

  • 限制复制用户的权限,仅授予必要的权限
  • 使用SSL加密复制连接
  • 定期更新复制用户密码
  • 严格配置pg_hba.conf,限制复制连接的来源IP

常见问题(FAQ)

Q1: 逻辑复制和物理复制有什么区别?

A1:

  • 逻辑复制基于SQL语句复制,支持跨版本、选择性复制和表级复制
  • 物理复制基于WAL文件复制,要求版本一致,复制整个数据库
  • 逻辑复制支持单向和双向复制,物理复制主要用于主从架构

Q2: 如何处理逻辑复制中的冲突?

A2:

  • 避免在订阅者上直接写入数据
  • 使用skip_errors = true跳过冲突(不推荐)
  • 手动修复冲突数据后重新启用订阅
  • 使用初始数据复制重新同步

Q3: 逻辑复制支持跨版本吗?

A3: 支持,发布者版本可以低于或高于订阅者版本,但建议版本差异不超过2个主要版本(如PostgreSQL 12到PostgreSQL 14)。

Q4: 如何监控逻辑复制延迟?

A4:

  • 使用pg_stat_subscription视图查看延迟时间和字节数
  • 使用pg_replication_slots视图查看复制槽的WAL消耗
  • 结合Prometheus+Grafana设置复制延迟告警

Q5: 逻辑复制支持DDL语句吗?

A5: PostgreSQL 14及以上版本支持DDL语句复制,需要在创建发布时指定publish_via_partition_root = truepublish = 'insert, update, delete, truncate'

Q6: 如何暂停和恢复逻辑复制?

A6:

  • 使用ALTER SUBSCRIPTION my_subscription DISABLE暂停复制
  • 使用ALTER SUBSCRIPTION my_subscription ENABLE恢复复制

Q7: 逻辑复制槽满了怎么办?

A7:

  • 检查订阅者是否正常运行
  • 清理不再使用的复制槽
  • 增加max_wal_size参数
  • 调整wal_keep_segmentswal_keep_size参数

Q8: 如何迁移逻辑复制订阅?

A8:

  • 在新订阅者上创建相同的表结构
  • 在发布者上创建新的复制槽
  • 在新订阅者上创建订阅
  • 验证数据一致性后删除旧订阅

Q9: 逻辑复制支持双向复制吗?

A9: 支持,但需要注意避免循环复制和冲突。可以通过以下方式实现:

  • 在不同的表组上设置双向复制
  • 使用触发器或规则防止循环复制
  • 设计合理的冲突检测和解决机制

Q10: 如何备份和恢复逻辑复制配置?

A10:

  • 备份发布配置:pg_dump -s -t pg_publication -t pg_publication_tables mydb
  • 备份订阅配置:pg_dump -s -t pg_subscription mydb
  • 恢复配置:psql -d mydb < dump.sql
  • 重新创建复制槽和订阅