Skip to content

OceanBase 监控脚本

核心概念

监控脚本是指用于自动化采集、分析和处理OceanBase数据库各种指标和状态信息的脚本程序。监控脚本可以帮助DBA和运维人员实时掌握数据库的运行状态,及时发现和处理潜在问题,提高数据库的可用性和可靠性。OceanBase监控脚本通常包括集群状态监控、性能指标采集、告警脚本、日志分析脚本等类型,可以通过多种方式部署和执行,如定时任务、监控系统集成等。

脚本开发基础

1. 环境准备

功能:准备监控脚本开发和运行的环境 适用场景

  • 监控脚本开发
  • 脚本测试和调试
  • 脚本部署和运行

环境要求

  • Python 3.6+ 或 Shell 环境
  • OceanBase 客户端工具(如 obclient、ocp-cli)
  • 必要的Python库(如 pymysql、requests、prometheus_client 等)

安装示例

bash
# 安装Python依赖
pip install pymysql requests prometheus_client pandas

# 安装OceanBase客户端
yum install -y obclient

2. 连接OceanBase集群

功能:建立与OceanBase集群的连接 适用场景

  • 从OceanBase集群获取监控数据
  • 执行管理命令
  • 检查集群状态

Python连接示例

python
import pymysql

# 连接OceanBase集群
def connect_ob(host, port, user, password, db):
    conn = pymysql.connect(
        host=host,
        port=port,
        user=user,
        password=password,
        db=db,
        charset='utf8mb4',
        cursorclass=pymysql.cursors.DictCursor
    )
    return conn

# 使用示例
conn = connect_ob('127.0.0.1', 2881, 'root', 'password', 'oceanbase')
cursor = conn.cursor()
cursor.execute('SELECT * FROM oceanbase.GV$OB_SERVER')
result = cursor.fetchall()

Shell连接示例

bash
# 使用obclient连接OceanBase集群
obclient -h127.0.0.1 -P2881 -uroot -ppassword -Doceanbase -c

核心监控脚本

1. 集群状态监控脚本

功能:监控OceanBase集群的整体状态 适用场景

  • 实时监控集群健康状态
  • 发现集群异常节点
  • 跟踪集群状态变化

Python脚本示例

python
import pymysql
import time

# 连接OceanBase集群
def connect_ob():
    return pymysql.connect(
        host='127.0.0.1',
        port=2881,
        user='root',
        password='password',
        db='oceanbase',
        charset='utf8mb4',
        cursorclass=pymysql.cursors.DictCursor
    )

# 监控集群状态
def monitor_cluster_status():
    conn = connect_ob()
    try:
        with conn.cursor() as cursor:
            # 查询集群节点状态
            cursor.execute('''
                SELECT 
                    s.svr_ip, 
                    s.svr_port, 
                    s.status, 
                    s.zone, 
                    s.start_service_time,
                    s.stop_time
                FROM oceanbase.GV$OB_SERVER s
            ''')
            servers = cursor.fetchall()
            
            print(f"[{time.strftime('%Y-%m-%d %H:%M:%S')}] 集群节点状态监控")
            print("-" * 80)
            print("IP地址       端口    状态    可用区  启动时间                停止时间")
            print("-" * 80)
            
            for server in servers:
                start_time = server['start_service_time'].strftime('%Y-%m-%d %H:%M:%S') if server['start_service_time'] else "-"
                stop_time = server['stop_time'].strftime('%Y-%m-%d %H:%M:%S') if server['stop_time'] else "-"
                print(f"{server['svr_ip']}  {server['svr_port']:5d}  {server['status']:6s}  {server['zone']:4s}  {start_time}  {stop_time}")
            
            # 查询集群整体状态
            cursor.execute('''
                SELECT 
                    cluster_id, 
                    cluster_name, 
                    status
                FROM oceanbase.DBA_OB_CLUSTERS
            ''')
            cluster = cursor.fetchone()
            
            print("\n" + "-" * 80)
            print(f"集群ID: {cluster['cluster_id']}, 集群名称: {cluster['cluster_name']}, 状态: {cluster['status']}")
            print("-" * 80)
            
    finally:
        conn.close()

# 主程序
if __name__ == "__main__":
    monitor_cluster_status()

2. 性能指标采集脚本

功能:采集OceanBase集群的性能指标 适用场景

  • 性能趋势分析
  • 性能问题定位
  • 性能优化评估

Python脚本示例

python
import pymysql
import time
import prometheus_client
from prometheus_client import Gauge

# 连接OceanBase集群
def connect_ob():
    return pymysql.connect(
        host='127.0.0.1',
        port=2881,
        user='root',
        password='password',
        db='oceanbase',
        charset='utf8mb4',
        cursorclass=pymysql.cursors.DictCursor
    )

# 初始化Prometheus指标
def init_metrics():
    metrics = {
        'ob_server_cpu_usage': Gauge('ob_server_cpu_usage', 'CPU使用率', ['svr_ip', 'svr_port', 'zone']),
        'ob_server_memory_usage': Gauge('ob_server_memory_usage', '内存使用率', ['svr_ip', 'svr_port', 'zone']),
        'ob_server_disk_usage': Gauge('ob_server_disk_usage', '磁盘使用率', ['svr_ip', 'svr_port', 'zone', 'device']),
        'ob_session_count': Gauge('ob_session_count', '会话数量', ['svr_ip', 'svr_port', 'tenant_id', 'tenant_name']),
        'ob_active_transaction_count': Gauge('ob_active_transaction_count', '活跃事务数量', ['svr_ip', 'svr_port', 'tenant_id', 'tenant_name']),
        'ob_slow_sql_count': Gauge('ob_slow_sql_count', '慢SQL数量', ['svr_ip', 'svr_port', 'tenant_id', 'tenant_name'])
    }
    return metrics

# 采集性能指标
def collect_performance_metrics(metrics):
    conn = connect_ob()
    try:
        with conn.cursor() as cursor:
            # 采集节点CPU和内存使用率
            cursor.execute('''
                SELECT 
                    svr_ip, 
                    svr_port, 
                    zone,
                    cpu_total, 
                    cpu_assigned, 
                    mem_total, 
                    mem_assigned
                FROM oceanbase.GV$OB_SERVER
            ''')
            servers = cursor.fetchall()
            
            for server in servers:
                cpu_usage = (server['cpu_assigned'] / server['cpu_total']) * 100 if server['cpu_total'] > 0 else 0
                memory_usage = (server['mem_assigned'] / server['mem_total']) * 100 if server['mem_total'] > 0 else 0
                
                metrics['ob_server_cpu_usage'].labels(
                    svr_ip=server['svr_ip'],
                    svr_port=server['svr_port'],
                    zone=server['zone']
                ).set(cpu_usage)
                
                metrics['ob_server_memory_usage'].labels(
                    svr_ip=server['svr_ip'],
                    svr_port=server['svr_port'],
                    zone=server['zone']
                ).set(memory_usage)
            
            # 采集会话和事务数量
            cursor.execute('''
                SELECT 
                    s.svr_ip, 
                    s.svr_port, 
                    s.tenant_id, 
                    t.tenant_name,
                    COUNT(*) as session_count,
                    SUM(CASE WHEN s.status = 'ACTIVE' THEN 1 ELSE 0 END) as active_session_count
                FROM oceanbase.GV$OB_SESSIONS s
                JOIN oceanbase.DBA_OB_TENANTS t ON s.tenant_id = t.tenant_id
                GROUP BY s.svr_ip, s.svr_port, s.tenant_id, t.tenant_name
            ''')
            sessions = cursor.fetchall()
            
            for session in sessions:
                metrics['ob_session_count'].labels(
                    svr_ip=session['svr_ip'],
                    svr_port=session['svr_port'],
                    tenant_id=session['tenant_id'],
                    tenant_name=session['tenant_name']
                ).set(session['session_count'])
            
            print(f"[{time.strftime('%Y-%m-%d %H:%M:%S')}] 性能指标采集完成")
            
    finally:
        conn.close()

# 主程序
if __name__ == "__main__":
    # 启动Prometheus服务器
    prometheus_client.start_http_server(8000)
    
    # 初始化指标
    metrics = init_metrics()
    
    # 定时采集指标
    while True:
        collect_performance_metrics(metrics)
        time.sleep(30)

3. 告警脚本

功能:根据监控指标生成告警 适用场景

  • 异常情况及时通知
  • 自动化故障处理
  • 运维响应自动化

Python脚本示例

python
import pymysql
import time
import smtplib
from email.mime.text import MIMEText
from email.header import Header

# 连接OceanBase集群
def connect_ob():
    return pymysql.connect(
        host='127.0.0.1',
        port=2881,
        user='root',
        password='password',
        db='oceanbase',
        charset='utf8mb4',
        cursorclass=pymysql.cursors.DictCursor
    )

# 发送邮件告警
def send_alert(subject, content):
    # 邮件配置
    smtp_server = 'smtp.example.com'
    smtp_port = 25
    sender = 'alert@example.com'
    receivers = ['admin@example.com']
    username = 'alert@example.com'
    password = 'password'
    
    # 创建邮件内容
    message = MIMEText(content, 'plain', 'utf-8')
    message['From'] = Header('OceanBase监控系统', 'utf-8')
    message['To'] = Header('运维团队', 'utf-8')
    message['Subject'] = Header(subject, 'utf-8')
    
    # 发送邮件
    try:
        smtp_obj = smtplib.SMTP(smtp_server, smtp_port)
        smtp_obj.login(username, password)
        smtp_obj.sendmail(sender, receivers, message.as_string())
        print(f"[{time.strftime('%Y-%m-%d %H:%M:%S')}] 告警邮件发送成功")
    except smtplib.SMTPException as e:
        print(f"[{time.strftime('%Y-%m-%d %H:%M:%S')}] 告警邮件发送失败: {e}")

# 检查集群告警
def check_cluster_alerts():
    conn = connect_ob()
    alerts = []
    
    try:
        with conn.cursor() as cursor:
            # 检查节点状态
            cursor.execute('''
                SELECT 
                    svr_ip, 
                    svr_port, 
                    zone, 
                    status
                FROM oceanbase.GV$OB_SERVER
                WHERE status != 'ACTIVE'
            ''')
            inactive_servers = cursor.fetchall()
            
            for server in inactive_servers:
                alerts.append(f"节点异常: {server['svr_ip']}:{server['svr_port']} (可用区: {server['zone']}) 状态: {server['status']}")
            
            # 检查CPU使用率
            cursor.execute('''
                SELECT 
                    svr_ip, 
                    svr_port, 
                    zone,
                    cpu_total, 
                    cpu_assigned
                FROM oceanbase.GV$OB_SERVER
                WHERE cpu_assigned / cpu_total > 0.8
            ''')
            high_cpu_servers = cursor.fetchall()
            
            for server in high_cpu_servers:
                cpu_usage = (server['cpu_assigned'] / server['cpu_total']) * 100
                alerts.append(f"CPU使用率过高: {server['svr_ip']}:{server['svr_port']} (可用区: {server['zone']}) 使用率: {cpu_usage:.2f}%")
            
            # 检查慢SQL数量
            cursor.execute('''
                SELECT 
                    s.svr_ip, 
                    s.svr_port, 
                    t.tenant_name,
                    COUNT(*) as slow_sql_count
                FROM oceanbase.GV$OB_SLOW_SQL s
                JOIN oceanbase.DBA_OB_TENANTS t ON s.tenant_id = t.tenant_id
                WHERE s.start_time >= NOW() - INTERVAL '5' MINUTE
                GROUP BY s.svr_ip, s.svr_port, t.tenant_name
                HAVING slow_sql_count > 10
            ''')
            high_slow_sql = cursor.fetchall()
            
            for item in high_slow_sql:
                alerts.append(f"慢SQL数量过多: {item['svr_ip']}:{item['svr_port']} (租户: {item['tenant_name']}) 5分钟内慢SQL数量: {item['slow_sql_count']}")
            
    finally:
        conn.close()
    
    return alerts

# 主程序
if __name__ == "__main__":
    alerts = check_cluster_alerts()
    
    if alerts:
        subject = "OceanBase集群告警"
        content = f"[{time.strftime('%Y-%m-%d %H:%M:%S')}] OceanBase集群出现异常情况:\n\n" + "\n".join(alerts)
        send_alert(subject, content)
    else:
        print(f"[{time.strftime('%Y-%m-%d %H:%M:%S')}] 集群运行正常,无告警")

4. 日志分析脚本

功能:分析OceanBase日志文件 适用场景

  • 日志内容分析
  • 异常日志检测
  • 日志统计和报告

Python脚本示例

python
import re
import os
import time
from collections import defaultdict

# 分析OceanBase日志
def analyze_ob_log(log_file):
    # 定义日志模式
    error_pattern = re.compile(r'ERROR|FATAL|CRITICAL')
    warning_pattern = re.compile(r'WARN|WARNING')
    slow_sql_pattern = re.compile(r'slow query')
    
    # 初始化统计数据
    stats = {
        'error_count': 0,
        'warning_count': 0,
        'slow_sql_count': 0,
        'error_types': defaultdict(int),
        'warning_types': defaultdict(int)
    }
    
    # 打开日志文件
    with open(log_file, 'r') as f:
        for line in f:
            # 统计错误日志
            if error_pattern.search(line):
                stats['error_count'] += 1
                # 提取错误类型
                error_type = line.split(':')[2].strip() if len(line.split(':')) > 2 else 'Unknown'
                stats['error_types'][error_type] += 1
            
            # 统计警告日志
            if warning_pattern.search(line):
                stats['warning_count'] += 1
                # 提取警告类型
                warning_type = line.split(':')[2].strip() if len(line.split(':')) > 2 else 'Unknown'
                stats['warning_types'][warning_type] += 1
            
            # 统计慢SQL
            if slow_sql_pattern.search(line):
                stats['slow_sql_count'] += 1
    
    return stats

# 生成日志分析报告
def generate_log_report(stats, log_file):
    report = f"[{time.strftime('%Y-%m-%d %H:%M:%S')}] OceanBase日志分析报告\n"
    report += f"日志文件: {log_file}\n"
    report += "=" * 80 + "\n"
    report += f"错误日志数量: {stats['error_count']}\n"
    report += f"警告日志数量: {stats['warning_count']}\n"
    report += f"慢SQL数量: {stats['slow_sql_count']}\n"
    
    if stats['error_types']:
        report += "\n错误类型统计:\n"
        report += "-" * 80 + "\n"
        for error_type, count in sorted(stats['error_types'].items(), key=lambda x: x[1], reverse=True):
            report += f"{error_type}: {count}\n"
    
    if stats['warning_types']:
        report += "\n警告类型统计:\n"
        report += "-" * 80 + "\n"
        for warning_type, count in sorted(stats['warning_types'].items(), key=lambda x: x[1], reverse=True):
            report += f"{warning_type}: {count}\n"
    
    report += "=" * 80 + "\n"
    
    return report

# 主程序
if __name__ == "__main__":
    log_file = '/data/ob/log/observer.log'
    
    if os.path.exists(log_file):
        stats = analyze_ob_log(log_file)
        report = generate_log_report(stats, log_file)
        print(report)
        
        # 保存报告到文件
        report_file = f"/tmp/ob_log_analysis_{time.strftime('%Y%m%d%H%M%S')}.txt"
        with open(report_file, 'w') as f:
            f.write(report)
        print(f"报告已保存到: {report_file}")
    else:
        print(f"日志文件不存在: {log_file}")

脚本部署和管理

1. 脚本部署方式

功能:部署监控脚本到生产环境 适用场景

  • 监控脚本上线
  • 脚本更新和维护
  • 脚本批量部署

部署方式

1.1 定时任务部署

bash
# 使用crontab部署定时执行的脚本
crontab -e

# 添加以下内容,每5分钟执行一次集群状态监控脚本
*/5 * * * * python /path/to/monitor_cluster_status.py >> /var/log/ob_monitor.log 2>&1

# 每30分钟执行一次性能指标采集脚本
*/30 * * * * python /path/to/collect_performance_metrics.py >> /var/log/ob_performance.log 2>&1

# 每小时执行一次日志分析脚本
0 * * * * python /path/to/analyze_ob_log.py >> /var/log/ob_log_analysis.log 2>&1

1.2 系统服务部署

bash
# 创建systemd服务文件
cat > /etc/systemd/system/ob-monitor.service << EOF
[Unit]
Description=OceanBase Monitoring Service
After=network.target

[Service]
Type=simple
User=root
ExecStart=/usr/bin/python /path/to/collect_performance_metrics.py
Restart=on-failure
RestartSec=5

[Install]
WantedBy=multi-user.target
EOF

# 启动服务并设置开机自启
systemctl daemon-reload
systemctl start ob-monitor
systemctl enable ob-monitor

2. 脚本监控和维护

功能:监控和维护已部署的监控脚本 适用场景

  • 脚本运行状态监控
  • 脚本日志管理
  • 脚本性能优化

监控方法

bash
# 检查脚本进程状态
ps aux | grep python | grep ob_monitor

# 查看脚本日志
tail -f /var/log/ob_monitor.log

# 检查脚本执行结果
cat /tmp/ob_cluster_status.txt

# 监控脚本资源使用情况
top -p $(pgrep -f collect_performance_metrics.py)

维护要点

  • 定期检查脚本运行状态
  • 及时清理和归档脚本日志
  • 根据业务需求调整脚本执行频率
  • 定期更新脚本,适应OceanBase版本变化
  • 监控脚本本身的性能,避免对系统造成过大负担

最佳实践

1. 脚本设计原则

  • 模块化设计:将脚本拆分为多个功能模块,便于维护和扩展
  • 错误处理:完善的错误处理机制,确保脚本在异常情况下能够正常退出
  • 日志记录:详细的日志记录,便于问题定位和分析
  • 配置分离:将配置信息与代码分离,便于不同环境的部署
  • 性能优化:优化脚本性能,减少对系统资源的占用
  • 安全考虑:保护敏感信息,如数据库密码等

2. 脚本执行频率

脚本类型建议执行频率说明
集群状态监控1-5分钟实时掌握集群状态
性能指标采集5-30分钟平衡实时性和系统开销
告警脚本1-5分钟及时发现和处理异常
日志分析脚本1小时-1天避免频繁读取大文件
数据备份脚本根据业务需求通常为每天或每周

3. 脚本安全管理

  • 权限控制:限制脚本的执行权限,仅允许特定用户执行
  • 密码管理:避免在脚本中硬编码密码,使用环境变量或配置文件加密存储
  • 网络安全:使用安全的连接方式,如SSL/TLS加密
  • 代码审计:定期审计脚本代码,发现和修复安全漏洞
  • 版本控制:使用版本控制系统管理脚本代码,便于追溯和回滚

4. 脚本自动化集成

  • 与监控系统集成:将脚本与Prometheus、Grafana等监控系统集成,实现可视化监控
  • 与告警系统集成:将脚本与Zabbix、Nagios等告警系统集成,实现自动化告警
  • 与自动化平台集成:将脚本与Ansible、SaltStack等自动化平台集成,实现批量部署和管理
  • 与CI/CD流水线集成:将脚本纳入CI/CD流水线,实现自动化测试和部署

常见问题(FAQ)

Q1: 如何确保监控脚本的可靠性?

A1: 确保监控脚本可靠性的方法:

  • 完善的错误处理机制,包括异常捕获和日志记录
  • 定期测试脚本,确保在各种情况下都能正常运行
  • 实现脚本自身的监控,如心跳检测等
  • 配置脚本的自动重启机制,如使用systemd的Restart选项
  • 定期检查脚本日志,及时发现和解决问题

Q2: 监控脚本对OceanBase集群性能有影响吗?

A2: 监控脚本对OceanBase集群性能的影响:

  • 合理设计的监控脚本对集群性能影响很小
  • 可以通过调整脚本执行频率和采样率来控制影响
  • 避免在高峰时段执行资源密集型脚本
  • 优化脚本代码,减少对数据库的查询次数和复杂度
  • 使用只读用户执行监控脚本,避免对数据库造成写操作

Q3: 如何处理监控脚本的并发执行问题?

A3: 处理监控脚本并发执行问题的方法:

  • 避免多个脚本同时执行相同的操作
  • 使用锁机制控制脚本的并发执行
  • 合理规划脚本的执行时间,避免冲突
  • 使用分布式调度系统,如Airflow、Celery等,统一管理脚本执行

Q4: 监控脚本如何适应OceanBase版本变化?

A4: 监控脚本适应OceanBase版本变化的方法:

  • 定期更新脚本,适配新的系统视图和参数
  • 使用版本兼容的API和查询语句
  • 实现脚本的版本检测和自适应机制
  • 建立脚本的测试环境,在新版本上线前进行测试

Q5: 如何提高监控脚本的可维护性?

A5: 提高监控脚本可维护性的方法:

  • 采用模块化设计,将功能拆分为独立的模块
  • 编写详细的注释和文档
  • 使用统一的代码风格和命名规范
  • 实现配置与代码分离,便于不同环境的部署
  • 使用版本控制系统管理脚本代码

Q6: 监控脚本如何处理大量数据?

A6: 监控脚本处理大量数据的方法:

  • 采用增量采集和处理方式,减少数据量
  • 使用采样机制,只采集部分数据
  • 实现数据压缩和归档,减少存储空间
  • 使用分布式处理框架,如Spark、Flink等,处理大规模数据
  • 优化数据存储结构,提高查询效率

Q7: 如何实现监控脚本的告警分级?

A7: 实现监控脚本告警分级的方法:

  • 根据告警的严重程度,将告警分为不同级别(如紧急、重要、警告、信息)
  • 为不同级别的告警配置不同的通知方式和接收人
  • 实现告警的抑制和聚合,避免告警风暴
  • 建立告警的升级机制,长时间未处理的告警自动升级

Q8: 监控脚本如何与云平台集成?

A8: 监控脚本与云平台集成的方法:

  • 使用云平台提供的SDK和API,实现与云监控系统的集成
  • 将监控数据发送到云平台的时序数据库,如阿里云TSDB、AWS CloudWatch等
  • 使用云平台提供的函数计算服务,实现无服务器的脚本执行
  • 与云平台的自动化运维工具集成,实现自动化故障处理

Q9: 如何实现监控脚本的可视化?

A9: 实现监控脚本可视化的方法:

  • 将监控数据发送到Prometheus,使用Grafana进行可视化展示
  • 使用Python的可视化库,如Matplotlib、Seaborn等,生成图表和报告
  • 与Web框架集成,开发自定义的监控仪表盘
  • 使用开源的监控可视化工具,如Kibana、Datadog等

Q10: 监控脚本如何处理跨地域集群的监控?

A10: 处理跨地域集群监控的方法:

  • 部署分布式监控架构,在每个地域部署监控脚本
  • 使用统一的监控数据采集和存储平台
  • 实现监控数据的跨地域同步和聚合
  • 考虑网络延迟和带宽限制,优化数据传输
  • 为不同地域的集群配置独立的告警规则和通知方式