Skip to content

PostgreSQL 存储过程安全性

存储过程是PostgreSQL中重要的数据库对象,它们封装了复杂的业务逻辑和数据操作。存储过程的安全性直接关系到整个数据库系统的安全,任何安全漏洞都可能导致数据泄露或系统被攻击。

存储过程面临的安全威胁主要包括SQL注入攻击、权限提升、敏感信息泄露、拒绝服务攻击等。为了确保存储过程的安全性,需要从设计、开发、部署和运维等多个环节进行全面考虑和防护。

SQL注入防护

参数化查询

在存储过程中使用参数化查询是防止SQL注入的最有效方法:

sql
-- 安全的参数化查询
CREATE OR REPLACE FUNCTION get_user_by_id(p_user_id INTEGER)
RETURNS TABLE (user_id INTEGER, username VARCHAR(50), email VARCHAR(100)) AS $$
BEGIN
    RETURN QUERY
    SELECT u.id, u.username, u.email
    FROM users u
    WHERE u.id = p_user_id;
END;
$$ LANGUAGE plpgsql SECURITY DEFINER;

-- 不安全的动态SQL(容易受到SQL注入攻击)
CREATE OR REPLACE FUNCTION unsafe_get_user(p_user_id INTEGER)
RETURNS TABLE (user_id INTEGER, username VARCHAR(50)) AS $$
DECLARE
    v_query TEXT;
BEGIN
    -- 危险:直接拼接用户输入
    v_query := 'SELECT id, username FROM users WHERE id = ' || p_user_id;
    RETURN QUERY EXECUTE v_query;
END;
$$ LANGUAGE plpgsql SECURITY DEFINER;

使用USING子句

在动态SQL中使用USING子句传递参数,避免SQL注入:

sql
-- 使用USING传递参数
CREATE OR REPLACE FUNCTION safe_search_users(
    p_search_term TEXT,
    p_limit INTEGER DEFAULT 10
) RETURNS TABLE (user_id INTEGER, username VARCHAR(50), email VARCHAR(100)) AS $$
DECLARE
    v_query TEXT;
BEGIN
    v_query := 'SELECT id, username, email FROM users ' ||
               'WHERE username ILIKE $1 OR email ILIKE $1 ' ||
               'ORDER BY username LIMIT ' || p_limit;
    
    RETURN QUERY EXECUTE v_query USING '%' || p_search_term || '%';
END;
$$ LANGUAGE plpgsql SECURITY DEFINER;

-- 使用FORMAT函数构建安全查询
CREATE OR REPLACE FUNCTION safe_query(
    p_table_name TEXT,
    p_column_name TEXT,
    p_value TEXT
) RETURNS TABLE (result JSONB) AS $$
DECLARE
    v_query TEXT;
BEGIN
    -- 使用FORMAT构建查询,使用%I进行标识符转义
    v_query := FORMAT('SELECT to_jsonb(row) FROM (SELECT %I FROM %I WHERE %I = $1) row',
                      p_column_name, p_table_name, p_column_name);
    
    RETURN QUERY EXECUTE v_query USING p_value;
END;
$$ LANGUAGE plpgsql SECURITY DEFINER;

输入验证

在存储过程中实施严格的输入验证:

sql
-- 创建输入验证函数
CREATE OR REPLACE FUNCTION validate_input(
    p_input TEXT,
    p_input_type TEXT
) RETURNS BOOLEAN AS $$
BEGIN
    CASE p_input_type
        WHEN 'user_id' THEN
            IF p_input !~ '^[0-9]+$' THEN
                RAISE EXCEPTION '无效的用户ID';
            END IF;
        WHEN 'email' THEN
            IF p_input !~ '^[A-Za-z0-9._%+-]+@[A-Za-z0-9.-]+\.[A-Z|a-z]{2,}$' THEN
                RAISE EXCEPTION '无效的邮箱地址';
            END IF;
        WHEN 'username' THEN
            IF LENGTH(p_input) < 3 OR LENGTH(p_input) > 50 THEN
                RAISE EXCEPTION '用户名长度必须在3-50之间';
            END IF;
            IF p_input !~ '^[A-Za-z0-9_]+$' THEN
                RAISE EXCEPTION '用户名只能包含字母、数字和下划线';
            END IF;
        WHEN 'phone' THEN
            IF p_input !~ '^[0-9-]+$' THEN
                RAISE EXCEPTION '无效的电话号码';
            END IF;
        ELSE
            RAISE EXCEPTION '未知的输入类型';
    END CASE;
    
    RETURN TRUE;
END;
$$ LANGUAGE plpgsql SECURITY DEFINER;

-- 在存储过程中使用输入验证
CREATE OR REPLACE FUNCTION create_user(
    p_username TEXT,
    p_email TEXT,
    p_phone TEXT
) RETURNS INTEGER AS $$
DECLARE
    v_user_id INTEGER;
BEGIN
    -- 验证所有输入
    PERFORM validate_input(p_username, 'username');
    PERFORM validate_input(p_email, 'email');
    PERFORM validate_input(p_phone, 'phone');
    
    -- 创建用户
    INSERT INTO users (username, email, phone)
    VALUES (p_username, p_email, p_phone)
    RETURNING id INTO v_user_id;
    
    RETURN v_user_id;
END;
$$ LANGUAGE plpgsql SECURITY DEFINER;

权限控制

SECURITY DEFINER与SECURITY INVOKER

理解并正确使用SECURITY DEFINER和SECURITY INVOKER选项:

sql
-- SECURITY DEFINER:以定义者的权限执行
CREATE OR REPLACE FUNCTION admin_delete_user(p_user_id INTEGER)
RETURNS BOOLEAN AS $$
BEGIN
    DELETE FROM users WHERE id = p_user_id;
    RETURN TRUE;
END;
$$ LANGUAGE plpgsql SECURITY DEFINER;

-- SECURITY INVOKER:以调用者的权限执行(默认)
CREATE OR REPLACE FUNCTION regular_get_user(p_user_id INTEGER)
RETURNS TABLE (id INTEGER, username VARCHAR(50)) AS $$
BEGIN
    RETURN QUERY SELECT id, username FROM users WHERE id = p_user_id;
END;
$$ LANGUAGE plpgsql SECURITY DEFINER VOLATILE;

最小权限原则

遵循最小权限原则,只授予必要的权限:

sql
-- 创建具有特定权限的角色
CREATE ROLE app_reader;
CREATE ROLE app_writer;
CREATE ROLE app_admin;

-- 授予读取权限
GRANT SELECT ON ALL TABLES IN SCHEMA public TO app_reader;

-- 授予写入权限
GRANT SELECT, INSERT, UPDATE ON ALL TABLES IN SCHEMA public TO app_writer;
GRANT USAGE, SELECT ON ALL SEQUENCES IN SCHEMA public TO app_writer;

-- 授予管理权限
GRANT SELECT, INSERT, UPDATE, DELETE ON ALL TABLES IN SCHEMA public TO app_admin;
GRANT USAGE, SELECT ON ALL SEQUENCES IN SCHEMA public TO app_admin;

-- 创建函数时指定 SECURITY DEFINER,并授予特定权限
CREATE OR REPLACE FUNCTION sensitive_operation(p_data TEXT)
RETURNS BOOLEAN AS $$
BEGIN
    -- 函数以所有者权限执行
    INSERT INTO audit_log (operation, data) VALUES ('sensitive', p_data);
    RETURN TRUE;
END;
$$ LANGUAGE plpgsql SECURITY DEFINER;

-- 只授予执行权限,不授予底层表权限
GRANT EXECUTE ON FUNCTION sensitive_operation(TEXT) TO app_user;

权限验证

在存储过程中实施权限验证:

sql
-- 检查调用者权限
CREATE OR REPLACE FUNCTION admin_only_operation(p_id INTEGER)
RETURNS BOOLEAN AS $$
DECLARE
    v_has_permission BOOLEAN;
BEGIN
    -- 检查调用者是否为管理员
    SELECT EXISTS (
        SELECT 1 FROM information_schema.role_table_grantees
        WHERE grantee = current_user
        AND table_name = 'admin_table'
    ) INTO v_has_permission;
    
    IF NOT v_has_permission THEN
        RAISE EXCEPTION '权限不足:需要管理员权限';
    END IF;
    
    -- 执行操作
    DELETE FROM admin_table WHERE id = p_id;
    RETURN TRUE;
END;
$$ LANGUAGE plpgsql SECURITY DEFINER;

敏感数据保护

数据脱敏

在存储过程中实现敏感数据脱敏:

sql
-- 创建数据脱敏函数
CREATE OR REPLACE FUNCTION mask_email(p_email TEXT)
RETURNS TEXT AS $$
BEGIN
    IF p_email IS NULL THEN
        RETURN NULL;
    END IF;
    RETURN LEFT(p_email, 2) || '***@***' || RIGHT(p_email, 3);
END;
$$ LANGUAGE plpgsql IMMUTABLE;

CREATE OR REPLACE FUNCTION mask_phone(p_phone TEXT)
RETURNS TEXT AS $$
BEGIN
    IF p_phone IS NULL THEN
        RETURN NULL;
    END IF;
    RETURN LEFT(p_phone, 3) || '****' || RIGHT(p_phone, 4);
END;
$$ LANGUAGE plpgsql IMMUTABLE;

CREATE OR REPLACE FUNCTION mask_id_card(p_id_card TEXT)
RETURNS TEXT AS $$
BEGIN
    IF p_id_card IS NULL THEN
        RETURN NULL;
    END IF;
    IF LENGTH(p_id_card) = 18 THEN
        RETURN LEFT(p_id_card, 6) || '********' || RIGHT(p_id_card, 4);
    END IF;
    RETURN p_id_card;
END;
$$ LANGUAGE plpgsql IMMUTABLE;

-- 脱敏查询函数
CREATE OR REPLACE FUNCTION get_user_public_info(p_user_id INTEGER)
RETURNS TABLE (user_id INTEGER, username TEXT, email_masked TEXT, phone_masked TEXT) AS $$
BEGIN
    RETURN QUERY
    SELECT 
        u.id,
        u.username,
        mask_email(u.email) AS email_masked,
        mask_phone(u.phone) AS phone_masked
    FROM users u
    WHERE u.id = p_user_id;
END;
$$ LANGUAGE plpgsql SECURITY DEFINER;

审计日志

记录敏感操作的审计日志:

sql
-- 创建审计日志表
CREATE TABLE audit_log (
    id BIGSERIAL PRIMARY KEY,
    operation_type VARCHAR(50) NOT NULL,
    table_name VARCHAR(100),
    record_id BIGINT,
    old_data JSONB,
    new_data JSONB,
    user_name VARCHAR(100) NOT NULL,
    client_addr INET,
    operation_time TIMESTAMP DEFAULT CURRENT_TIMESTAMP
);

-- 创建审计触发器函数
CREATE OR REPLACE FUNCTION audit_trigger_function()
RETURNS TRIGGER AS $$
DECLARE
    v_old_data JSONB;
    v_new_data JSONB;
BEGIN
    IF TG_OP = 'DELETE' THEN
        v_old_data := to_jsonb(OLD);
        v_new_data := NULL;
    ELSIF TG_OP = 'INSERT' THEN
        v_old_data := NULL;
        v_new_data := to_jsonb(NEW);
    ELSIF TG_OP = 'UPDATE' THEN
        v_old_data := to_jsonb(OLD);
        v_new_data := to_jsonb(NEW);
    END IF;
    
    INSERT INTO audit_log (operation_type, table_name, record_id, old_data, new_data, user_name, client_addr)
    VALUES (
        TG_OP,
        TG_TABLE_NAME,
        COALESCE(NEW.id, OLD.id),
        v_old_data,
        v_new_data,
        current_setting('app.current_user', TRUE),
        inet_client_addr()
    );
    
    RETURN NEW;
END;
$$ LANGUAGE plpgsql SECURITY DEFINER;

-- 应用审计触发器
CREATE TRIGGER audit_users_trigger
AFTER INSERT OR UPDATE OR DELETE ON users
FOR EACH ROW EXECUTE FUNCTION audit_trigger_function();

错误处理

安全错误处理

避免在错误消息中泄露敏感信息:

sql
-- 不安全的错误处理(泄露内部信息)
CREATE OR REPLACE FUNCTION unsafe_lookup(p_id INTEGER)
RETURNS TABLE (id INTEGER, data TEXT) AS $$
BEGIN
    RETURN QUERY SELECT id, secret_data FROM sensitive_table WHERE id = p_id;
EXCEPTION WHEN OTHERS THEN
    -- 危险:返回详细错误信息
    RAISE NOTICE 'Error: %', SQLERRM;
    RAISE NOTICE 'Detail: %', SQLSTATE;
    RETURN;
END;
$$ LANGUAGE plpgsql SECURITY DEFINER;

-- 安全的错误处理(不泄露敏感信息)
CREATE OR REPLACE FUNCTION safe_lookup(p_id INTEGER)
RETURNS TABLE (id INTEGER, data TEXT) AS $$
BEGIN
    RETURN QUERY SELECT id, secret_data FROM sensitive_table WHERE id = p_id;
EXCEPTION WHEN OTHERS THEN
    -- 安全:记录详细日志,返回通用错误
    RAISE NOTICE '操作失败,请联系管理员';
    RETURN;
END;
$$ LANGUAGE plpgsql SECURITY DEFINER;

-- 自定义错误处理
CREATE OR REPLACE FUNCTION validated_lookup(p_id INTEGER)
RETURNS TABLE (id INTEGER, data TEXT) AS $$
BEGIN
    IF p_id < 0 THEN
        RAISE EXCEPTION 'invalid_id' USING MESSAGE = '无效的ID参数';
    END IF;
    
    RETURN QUERY SELECT id, secret_data FROM sensitive_table WHERE id = p_id;
    
    IF NOT FOUND THEN
        RAISE EXCEPTION 'not_found' USING MESSAGE = '记录不存在';
    END IF;
END;
$$ LANGUAGE plpgsql SECURITY DEFINER;

性能与安全平衡

防止资源耗尽

限制存储过程的资源使用:

sql
-- 限制返回行数
CREATE OR REPLACE FUNCTION safe_list_users(p_limit INTEGER DEFAULT 100, p_offset INTEGER DEFAULT 0)
RETURNS TABLE (id INTEGER, username VARCHAR(50)) AS $$
BEGIN
    -- 限制最大返回行数
    IF p_limit > 1000 THEN
        p_limit := 1000;
    END IF;
    
    RETURN QUERY
    SELECT u.id, u.username
    FROM users u
    ORDER BY u.id
    LIMIT p_limit OFFSET p_offset;
END;
$$ LANGUAGE plpgsql SECURITY DEFINER;

-- 设置事务超时
CREATE OR REPLACE FUNCTION long_running_operation()
RETURNS BOOLEAN AS $$
BEGIN
    -- 设置执行超时
    SET statement_timeout = '30s';
    
    -- 执行长时间操作
    PERFORM pg_sleep(10);  -- 模拟长时间操作
    
    RETURN TRUE;
EXCEPTION WHEN QUERY_CANCELED THEN
    RAISE EXCEPTION '操作超时';
END;
$$ LANGUAGE plpgsql SECURITY DEFINER;

防止死锁

设计存储过程时避免死锁:

sql
-- 按固定顺序访问表
CREATE OR REPLACE FUNCTION safe_transfer(
    p_from_account BIGINT,
    p_to_account BIGINT,
    p_amount NUMERIC
) RETURNS BOOLEAN AS $$
DECLARE
    v_lock1 BIGINT;
    v_lock2 BIGINT;
BEGIN
    -- 确定锁定顺序
    IF p_from_account < p_to_account THEN
        v_lock1 := p_from_account;
        v_lock2 := p_to_account;
    ELSE
        v_lock1 := p_to_account;
        v_lock2 := p_from_account;
    END IF;
    
    -- 按顺序锁定账户
    PERFORM pg_advisory_xact_lock(v_lock1);
    PERFORM pg_advisory_xact_lock(v_lock2);
    
    -- 执行转账
    UPDATE accounts SET balance = balance - p_amount WHERE id = p_from_account;
    UPDATE accounts SET balance = balance + p_amount WHERE id = p_to_account;
    
    RETURN TRUE;
END;
$$ LANGUAGE plpgsql SECURITY DEFINER;

安全开发最佳实践

代码审查清单

存储过程代码审查时应该检查:

markdown
## 存储过程安全审查清单

### 输入验证
- [ ] 所有外部输入是否都经过验证?
- [ ] 是否使用了参数化查询?
- [ ] 是否避免了动态SQL拼接?

### 权限控制
- [ ] 是否使用了正确的SECURITY选项?
- [ ] 是否遵循最小权限原则?
- [ ] 是否验证调用者权限?

### 敏感数据
- [ ] 是否对敏感数据进行了脱敏处理?
- [ ] 是否记录了敏感操作审计日志?
- [ ] 是否避免了错误消息中泄露敏感信息?

### 错误处理
- [ ] 是否正确处理了所有异常?
- [ ] 是否避免了错误信息泄露系统细节?
- [ ] 是否记录了错误日志?

### 性能安全
- [ ] 是否限制了资源使用?
- [ ] 是否避免了死锁?
- [ ] 是否设置了适当的超时?

### 加密
- [ ] 敏感数据是否加密存储?
- [ ] 是否使用了强加密算法?
- [ ] 密钥管理是否安全?

安全配置建议

sql
-- 建议的安全配置

-- 1. 设置适当的事务超时
ALTER ROLE application_user SET statement_timeout = '30s';

-- 2. 启用行级安全性
ALTER TABLE sensitive_data ENABLE ROW LEVEL SECURITY;

-- 3. 创建行级安全策略
CREATE POLICY sensitive_data_access_policy ON sensitive_data
    FOR SELECT
    USING (current_setting('app.current_user', TRUE) = owner_id::TEXT);

-- 4. 限制函数创建权限
REVOKE CREATE ON SCHEMA public FROM PUBLIC;

-- 5. 启用安全标签
SELECT * FROM pg_seclabel;

常见问题与解决方案

SQL注入攻击检测

检测和防止SQL注入攻击:

sql
-- 检测可疑的动态SQL
CREATE OR REPLACE FUNCTION detect_sql_injection()
RETURNS TABLE (suspect_functions TEXT) AS $$
BEGIN
    RETURN QUERY
    SELECT pg_get_functiondef(oid) AS suspect_functions
    FROM pg_proc
    WHERE prokind = 'f'
    AND pg_function_is_visible(oid)
    AND (pg_get_functiondef(oid) ILIKE '%execute%'
         OR pg_get_functiondef(oid) ILIKE '%dynamic%')
    AND NOT pg_get_functiondef(oid) ILIKE '%using%';
END;
$$ LANGUAGE plpgsql SECURITY DEFINER;

权限提升防护

防止权限提升攻击:

sql
-- 监控权限变更
CREATE OR REPLACE FUNCTION monitor_privilege_changes()
RETURNS TRIGGER AS $$
BEGIN
    IF TG_OP IN ('GRANT', 'REVOKE') THEN
        INSERT INTO privilege_audit_log (operation, granted_by, target_role, permissions, grant_time)
        VALUES (
            TG_OP,
            current_setting('app.current_user', TRUE),
            TG_GRANTEE,
            TG_ARGV[0],
            CURRENT_TIMESTAMP
        );
    END IF;
    RETURN NULL;
END;
$$ LANGUAGE plpgsql SECURITY DEFINER;

常见问题(FAQ)

Q1: 如何验证存储过程的安全性?

A1: 验证存储过程安全性的方法:

  • 使用代码审查清单检查每个存储过程
  • 使用自动化工具扫描SQL注入漏洞
  • 进行渗透测试
  • 审查错误处理和日志记录
  • 检查权限设置

Q2: SECURITY DEFINER函数有什么安全风险?

A2: SECURITY DEFINER函数的风险包括:

  • 可能被用于权限提升攻击
  • 如果函数有漏洞,攻击者可能获得函数所有者的权限
  • 错误的输入验证可能导致SQL注入

Q3: 如何防止存储过程中的SQL注入?

A3: 防止SQL注入的方法:

  • 始终使用参数化查询
  • 使用USING子句传递参数
  • 实施严格的输入验证
  • 避免动态SQL拼接
  • 使用FORMAT函数时正确转义标识符

Q4: 存储过程中如何安全地处理密码?

A4: 密码处理最佳实践:

  • 使用哈希函数(如bcrypt)存储密码
  • 使用随机盐
  • 实施密码强度验证
  • 不在日志中记录密码
  • 使用专门的密码处理函数

Q5: 如何审计存储过程的执行?

A5: 审计存储过程执行的方法:

  • 创建审计日志表
  • 在存储过程中记录关键操作
  • 使用审计触发器
  • 集成到企业审计系统
  • 定期审查审计日志

Q6: 存储过程性能问题会影响安全性吗?

A6: 性能问题可能导致安全风险:

  • 超时设置不当可能导致拒绝服务
  • 资源耗尽可能影响系统可用性
  • 死锁可能导致数据不一致
  • 建议设置适当的超时和资源限制

Q7: 如何更新生产环境中的存储过程?

A7: 更新存储过程的安全建议:

  • 在测试环境验证更改
  • 使用事务更新,确保原子性
  • 设置适当的锁定策略
  • 更新前后进行备份
  • 记录变更历史
  • 通知相关团队