外观
PostgreSQL 逻辑复制
逻辑复制是PostgreSQL 10引入的功能,基于发布-订阅模型,支持跨版本复制、选择性复制和表级复制。
逻辑复制前提条件
1. 数据库配置
发布者和订阅者都需要配置以下参数:
bash
# 启用逻辑复制
echo "wal_level = logical" >> postgresql.conf
# 设置复制槽保留时间
echo "max_replication_slots = 10" >> postgresql.conf
# 设置逻辑复制工作者数量
echo "max_logical_replication_workers = 4" >> postgresql.conf
# 设置每个订阅的工作者数量
echo "max_worker_processes = 10" >> postgresql.conf
# 重新加载配置(发布者需要重启数据库)
pg_ctl reload -D /var/lib/postgresql/14/main2. 复制用户权限
创建具有复制权限的用户:
sql
-- 创建复制用户
CREATE USER replicator WITH REPLICATION LOGIN PASSWORD 'password';
-- 在pg_hba.conf中添加复制权限
echo "host replication replicator 192.168.1.0/24 md5" >> pg_hba.conf
echo "host all all 192.168.1.0/24 md5" >> pg_hba.conf3. 表要求
- 表必须有主键或唯一约束
- 发布者和订阅者的表结构必须兼容
- 订阅者的表必须已存在(或使用
copy_data = true自动创建)
发布配置
1. 创建基本发布
sql
-- 创建发布(指定表)
CREATE PUBLICATION my_publication FOR TABLE users, orders;
-- 创建发布(所有表)
CREATE PUBLICATION all_tables FOR ALL TABLES;
-- 创建发布(指定操作类型)
CREATE PUBLICATION my_publication
FOR TABLE users, orders
WITH (publish = 'insert, update, delete');2. 高级发布配置
sql
-- 创建发布(仅包含特定列)
-- 注意:PostgreSQL不直接支持列级复制,需要通过视图实现
CREATE VIEW users_view AS SELECT id, name, email FROM users;
CREATE PUBLICATION users_publication FOR TABLE users_view;
-- 创建发布(包含TRUNCATE操作)
CREATE PUBLICATION my_publication
FOR TABLE users, orders
WITH (publish = 'insert, update, delete, truncate');3. 管理发布
sql
-- 添加表到发布
ALTER PUBLICATION my_publication ADD TABLE products;
-- 从发布中移除表
ALTER PUBLICATION my_publication DROP TABLE products;
-- 修改发布配置
ALTER PUBLICATION my_publication SET (publish = 'insert, update, delete');
-- 查看发布
SELECT * FROM pg_publication;
-- 查看发布包含的表
SELECT * FROM pg_publication_tables;
-- 删除发布
DROP PUBLICATION my_publication;订阅配置
1. 创建基本订阅
sql
-- 创建订阅
CREATE SUBSCRIPTION my_subscription
CONNECTION 'host=pub_host port=5432 user=replicator dbname=mydb password=password'
PUBLICATION my_publication;
-- 创建订阅(不自动复制数据)
CREATE SUBSCRIPTION my_subscription
CONNECTION 'host=pub_host port=5432 user=replicator dbname=mydb password=password'
PUBLICATION my_publication
WITH (copy_data = false);2. 高级订阅配置
sql
-- 创建订阅(指定复制槽名称)
CREATE SUBSCRIPTION my_subscription
CONNECTION 'host=pub_host port=5432 user=replicator dbname=mydb password=password'
PUBLICATION my_publication
WITH (
copy_data = true,
create_slot = true,
slot_name = 'my_slot',
enabled = true,
synchronous_commit = 'on'
);
-- 创建订阅(使用已存在的复制槽)
CREATE SUBSCRIPTION my_subscription
CONNECTION 'host=pub_host port=5432 user=replicator dbname=mydb password=password'
PUBLICATION my_publication
WITH (
copy_data = true,
create_slot = false,
slot_name = 'existing_slot',
enabled = true
);3. 管理订阅
sql
-- 禁用订阅
ALTER SUBSCRIPTION my_subscription DISABLE;
-- 启用订阅
ALTER SUBSCRIPTION my_subscription ENABLE;
-- 刷新发布(更新订阅的表列表)
ALTER SUBSCRIPTION my_subscription REFRESH PUBLICATION;
-- 修改订阅连接信息
ALTER SUBSCRIPTION my_subscription CONNECTION 'host=new_pub_host port=5432 user=replicator dbname=mydb password=password';
-- 修改订阅配置参数
ALTER SUBSCRIPTION my_subscription SET (synchronous_commit = 'off');
-- 删除订阅
DROP SUBSCRIPTION my_subscription;监控逻辑复制
1. 查看复制状态
sql
-- 查看订阅状态
SELECT
subname,
subdbid,
subenabled,
subconninfo,
subslotname
FROM pg_subscription;
-- 查看订阅统计信息
SELECT
subname,
received_lsn,
last_msg_send_time,
last_msg_receipt_time,
latest_end_lsn,
latest_end_time
FROM pg_stat_subscription;2. 监控复制延迟
sql
-- 查看复制延迟
SELECT
now() - last_msg_receipt_time AS lag_time,
(latest_end_lsn - received_lsn) AS lag_bytes
FROM pg_stat_subscription;
-- 查看复制槽状态
SELECT
slot_name,
plugin,
slot_type,
active,
restart_lsn,
confirmed_flush_lsn,
pg_wal_lsn_diff(pg_current_wal_lsn(), confirmed_flush_lsn) AS lag_bytes
FROM pg_replication_slots;3. 查看复制进程
sql
-- 查看逻辑复制工作者进程
SELECT
pid,
backend_type,
usename,
application_name,
client_addr,
state,
query
FROM pg_stat_activity
WHERE backend_type LIKE '%logical%' OR application_name LIKE '%subscription%';冲突处理
1. 查看冲突
sql
-- 查看冲突统计
SELECT * FROM pg_stat_subscription_conflicts;
-- 查看详细冲突日志
SELECT * FROM pg_log WHERE message LIKE '%conflict%';2. 冲突类型
常见的冲突类型包括:
- 唯一约束冲突(duplicate key violation)
- 外键约束冲突(foreign key violation)
- 数据类型不兼容(data type mismatch)
3. 冲突处理方法
sql
-- 方法1:跳过冲突事务(不推荐,可能导致数据不一致)
ALTER SUBSCRIPTION my_subscription SET (skip_errors = true);
-- 方法2:禁用订阅,手动解决冲突后重新启用
ALTER SUBSCRIPTION my_subscription DISABLE;
-- 手动修复冲突数据
UPDATE users SET name = 'new_name' WHERE id = 1;
-- 重新启用订阅
ALTER SUBSCRIPTION my_subscription ENABLE;
-- 方法3:使用初始数据复制重新同步
ALTER SUBSCRIPTION my_subscription DISABLE;
DROP SUBSCRIPTION my_subscription;
-- 重新创建订阅,使用copy_data = true重新同步数据
CREATE SUBSCRIPTION my_subscription
CONNECTION 'host=pub_host port=5432 user=replicator dbname=mydb password=password'
PUBLICATION my_publication
WITH (copy_data = true, create_slot = true);4. 冲突预防
- 避免在订阅者上直接写入数据
- 使用只读模式的订阅者
- 设计合理的主键和唯一约束
- 确保发布者和订阅者的表结构一致
- 使用合适的冲突检测机制
逻辑复制维护
1. 复制槽管理
sql
-- 查看复制槽
SELECT * FROM pg_replication_slots;
-- 删除不再使用的复制槽
SELECT pg_drop_replication_slot('unused_slot');
-- 检查复制槽的WAL消耗
SELECT
slot_name,
pg_wal_lsn_diff(pg_current_wal_lsn(), confirmed_flush_lsn) AS wal_used_bytes
FROM pg_replication_slots;2. 清理过期WAL
sql
-- 查看WAL保留情况
SELECT
name,
setting,
unit
FROM pg_settings
WHERE name LIKE '%wal%' OR name LIKE '%checkpoint%';
-- 手动清理WAL(仅在必要时)
SELECT pg_switch_wal();
SELECT pg_archivecleanup('/var/lib/postgresql/14/main/pg_wal', pg_current_wal_lsn());3. 重新同步数据
sql
-- 方法1:使用copy_data重新同步
ALTER SUBSCRIPTION my_subscription DISABLE;
ALTER SUBSCRIPTION my_subscription SET (slot_name = NONE);
ALTER SUBSCRIPTION my_subscription REFRESH PUBLICATION WITH (copy_data = true);
ALTER SUBSCRIPTION my_subscription ENABLE;
-- 方法2:使用pg_dump/pg_restore重新同步
pg_dump -h pub_host -U replicator -d mydb -t users -t orders > dump.sql
psql -h sub_host -U replicator -d mydb < dump.sql逻辑复制最佳实践
1. 配置最佳实践
- 合理设置
wal_level = logical - 根据订阅数量调整
max_replication_slots和max_logical_replication_workers - 使用合适的
synchronous_commit设置(生产环境建议使用on或remote_write) - 定期监控复制延迟,设置延迟告警
2. 性能优化
- 避免在单个发布中包含过多表
- 使用多个订阅并行复制不同的表组
- 调整
max_logical_replication_workers提高并行复制性能 - 考虑使用分区表减少单个表的复制压力
3. 高可用配置
- 结合物理复制实现级联复制
- 使用多个订阅者实现多活架构
- 配置自动故障转移机制
- 定期备份复制槽信息
4. 安全考虑
- 限制复制用户的权限,仅授予必要的权限
- 使用SSL加密复制连接
- 定期更新复制用户密码
- 严格配置pg_hba.conf,限制复制连接的来源IP
常见问题(FAQ)
Q1: 逻辑复制和物理复制有什么区别?
A1:
- 逻辑复制基于SQL语句复制,支持跨版本、选择性复制和表级复制
- 物理复制基于WAL文件复制,要求版本一致,复制整个数据库
- 逻辑复制支持单向和双向复制,物理复制主要用于主从架构
Q2: 如何处理逻辑复制中的冲突?
A2:
- 避免在订阅者上直接写入数据
- 使用
skip_errors = true跳过冲突(不推荐) - 手动修复冲突数据后重新启用订阅
- 使用初始数据复制重新同步
Q3: 逻辑复制支持跨版本吗?
A3: 支持,发布者版本可以低于或高于订阅者版本,但建议版本差异不超过2个主要版本(如PostgreSQL 12到PostgreSQL 14)。
Q4: 如何监控逻辑复制延迟?
A4:
- 使用
pg_stat_subscription视图查看延迟时间和字节数 - 使用
pg_replication_slots视图查看复制槽的WAL消耗 - 结合Prometheus+Grafana设置复制延迟告警
Q5: 逻辑复制支持DDL语句吗?
A5: PostgreSQL 14及以上版本支持DDL语句复制,需要在创建发布时指定publish_via_partition_root = true和publish = 'insert, update, delete, truncate'。
Q6: 如何暂停和恢复逻辑复制?
A6:
- 使用
ALTER SUBSCRIPTION my_subscription DISABLE暂停复制 - 使用
ALTER SUBSCRIPTION my_subscription ENABLE恢复复制
Q7: 逻辑复制槽满了怎么办?
A7:
- 检查订阅者是否正常运行
- 清理不再使用的复制槽
- 增加
max_wal_size参数 - 调整
wal_keep_segments或wal_keep_size参数
Q8: 如何迁移逻辑复制订阅?
A8:
- 在新订阅者上创建相同的表结构
- 在发布者上创建新的复制槽
- 在新订阅者上创建订阅
- 验证数据一致性后删除旧订阅
Q9: 逻辑复制支持双向复制吗?
A9: 支持,但需要注意避免循环复制和冲突。可以通过以下方式实现:
- 在不同的表组上设置双向复制
- 使用触发器或规则防止循环复制
- 设计合理的冲突检测和解决机制
Q10: 如何备份和恢复逻辑复制配置?
A10:
- 备份发布配置:
pg_dump -s -t pg_publication -t pg_publication_tables mydb - 备份订阅配置:
pg_dump -s -t pg_subscription mydb - 恢复配置:
psql -d mydb < dump.sql - 重新创建复制槽和订阅
