外观
OceanBase 监控脚本
核心概念
监控脚本是指用于自动化采集、分析和处理OceanBase数据库各种指标和状态信息的脚本程序。监控脚本可以帮助DBA和运维人员实时掌握数据库的运行状态,及时发现和处理潜在问题,提高数据库的可用性和可靠性。OceanBase监控脚本通常包括集群状态监控、性能指标采集、告警脚本、日志分析脚本等类型,可以通过多种方式部署和执行,如定时任务、监控系统集成等。
脚本开发基础
1. 环境准备
功能:准备监控脚本开发和运行的环境 适用场景:
- 监控脚本开发
- 脚本测试和调试
- 脚本部署和运行
环境要求:
- Python 3.6+ 或 Shell 环境
- OceanBase 客户端工具(如 obclient、ocp-cli)
- 必要的Python库(如 pymysql、requests、prometheus_client 等)
安装示例:
bash
# 安装Python依赖
pip install pymysql requests prometheus_client pandas
# 安装OceanBase客户端
yum install -y obclient2. 连接OceanBase集群
功能:建立与OceanBase集群的连接 适用场景:
- 从OceanBase集群获取监控数据
- 执行管理命令
- 检查集群状态
Python连接示例:
python
import pymysql
# 连接OceanBase集群
def connect_ob(host, port, user, password, db):
conn = pymysql.connect(
host=host,
port=port,
user=user,
password=password,
db=db,
charset='utf8mb4',
cursorclass=pymysql.cursors.DictCursor
)
return conn
# 使用示例
conn = connect_ob('127.0.0.1', 2881, 'root', 'password', 'oceanbase')
cursor = conn.cursor()
cursor.execute('SELECT * FROM oceanbase.GV$OB_SERVER')
result = cursor.fetchall()Shell连接示例:
bash
# 使用obclient连接OceanBase集群
obclient -h127.0.0.1 -P2881 -uroot -ppassword -Doceanbase -c核心监控脚本
1. 集群状态监控脚本
功能:监控OceanBase集群的整体状态 适用场景:
- 实时监控集群健康状态
- 发现集群异常节点
- 跟踪集群状态变化
Python脚本示例:
python
import pymysql
import time
# 连接OceanBase集群
def connect_ob():
return pymysql.connect(
host='127.0.0.1',
port=2881,
user='root',
password='password',
db='oceanbase',
charset='utf8mb4',
cursorclass=pymysql.cursors.DictCursor
)
# 监控集群状态
def monitor_cluster_status():
conn = connect_ob()
try:
with conn.cursor() as cursor:
# 查询集群节点状态
cursor.execute('''
SELECT
s.svr_ip,
s.svr_port,
s.status,
s.zone,
s.start_service_time,
s.stop_time
FROM oceanbase.GV$OB_SERVER s
''')
servers = cursor.fetchall()
print(f"[{time.strftime('%Y-%m-%d %H:%M:%S')}] 集群节点状态监控")
print("-" * 80)
print("IP地址 端口 状态 可用区 启动时间 停止时间")
print("-" * 80)
for server in servers:
start_time = server['start_service_time'].strftime('%Y-%m-%d %H:%M:%S') if server['start_service_time'] else "-"
stop_time = server['stop_time'].strftime('%Y-%m-%d %H:%M:%S') if server['stop_time'] else "-"
print(f"{server['svr_ip']} {server['svr_port']:5d} {server['status']:6s} {server['zone']:4s} {start_time} {stop_time}")
# 查询集群整体状态
cursor.execute('''
SELECT
cluster_id,
cluster_name,
status
FROM oceanbase.DBA_OB_CLUSTERS
''')
cluster = cursor.fetchone()
print("\n" + "-" * 80)
print(f"集群ID: {cluster['cluster_id']}, 集群名称: {cluster['cluster_name']}, 状态: {cluster['status']}")
print("-" * 80)
finally:
conn.close()
# 主程序
if __name__ == "__main__":
monitor_cluster_status()2. 性能指标采集脚本
功能:采集OceanBase集群的性能指标 适用场景:
- 性能趋势分析
- 性能问题定位
- 性能优化评估
Python脚本示例:
python
import pymysql
import time
import prometheus_client
from prometheus_client import Gauge
# 连接OceanBase集群
def connect_ob():
return pymysql.connect(
host='127.0.0.1',
port=2881,
user='root',
password='password',
db='oceanbase',
charset='utf8mb4',
cursorclass=pymysql.cursors.DictCursor
)
# 初始化Prometheus指标
def init_metrics():
metrics = {
'ob_server_cpu_usage': Gauge('ob_server_cpu_usage', 'CPU使用率', ['svr_ip', 'svr_port', 'zone']),
'ob_server_memory_usage': Gauge('ob_server_memory_usage', '内存使用率', ['svr_ip', 'svr_port', 'zone']),
'ob_server_disk_usage': Gauge('ob_server_disk_usage', '磁盘使用率', ['svr_ip', 'svr_port', 'zone', 'device']),
'ob_session_count': Gauge('ob_session_count', '会话数量', ['svr_ip', 'svr_port', 'tenant_id', 'tenant_name']),
'ob_active_transaction_count': Gauge('ob_active_transaction_count', '活跃事务数量', ['svr_ip', 'svr_port', 'tenant_id', 'tenant_name']),
'ob_slow_sql_count': Gauge('ob_slow_sql_count', '慢SQL数量', ['svr_ip', 'svr_port', 'tenant_id', 'tenant_name'])
}
return metrics
# 采集性能指标
def collect_performance_metrics(metrics):
conn = connect_ob()
try:
with conn.cursor() as cursor:
# 采集节点CPU和内存使用率
cursor.execute('''
SELECT
svr_ip,
svr_port,
zone,
cpu_total,
cpu_assigned,
mem_total,
mem_assigned
FROM oceanbase.GV$OB_SERVER
''')
servers = cursor.fetchall()
for server in servers:
cpu_usage = (server['cpu_assigned'] / server['cpu_total']) * 100 if server['cpu_total'] > 0 else 0
memory_usage = (server['mem_assigned'] / server['mem_total']) * 100 if server['mem_total'] > 0 else 0
metrics['ob_server_cpu_usage'].labels(
svr_ip=server['svr_ip'],
svr_port=server['svr_port'],
zone=server['zone']
).set(cpu_usage)
metrics['ob_server_memory_usage'].labels(
svr_ip=server['svr_ip'],
svr_port=server['svr_port'],
zone=server['zone']
).set(memory_usage)
# 采集会话和事务数量
cursor.execute('''
SELECT
s.svr_ip,
s.svr_port,
s.tenant_id,
t.tenant_name,
COUNT(*) as session_count,
SUM(CASE WHEN s.status = 'ACTIVE' THEN 1 ELSE 0 END) as active_session_count
FROM oceanbase.GV$OB_SESSIONS s
JOIN oceanbase.DBA_OB_TENANTS t ON s.tenant_id = t.tenant_id
GROUP BY s.svr_ip, s.svr_port, s.tenant_id, t.tenant_name
''')
sessions = cursor.fetchall()
for session in sessions:
metrics['ob_session_count'].labels(
svr_ip=session['svr_ip'],
svr_port=session['svr_port'],
tenant_id=session['tenant_id'],
tenant_name=session['tenant_name']
).set(session['session_count'])
print(f"[{time.strftime('%Y-%m-%d %H:%M:%S')}] 性能指标采集完成")
finally:
conn.close()
# 主程序
if __name__ == "__main__":
# 启动Prometheus服务器
prometheus_client.start_http_server(8000)
# 初始化指标
metrics = init_metrics()
# 定时采集指标
while True:
collect_performance_metrics(metrics)
time.sleep(30)3. 告警脚本
功能:根据监控指标生成告警 适用场景:
- 异常情况及时通知
- 自动化故障处理
- 运维响应自动化
Python脚本示例:
python
import pymysql
import time
import smtplib
from email.mime.text import MIMEText
from email.header import Header
# 连接OceanBase集群
def connect_ob():
return pymysql.connect(
host='127.0.0.1',
port=2881,
user='root',
password='password',
db='oceanbase',
charset='utf8mb4',
cursorclass=pymysql.cursors.DictCursor
)
# 发送邮件告警
def send_alert(subject, content):
# 邮件配置
smtp_server = 'smtp.example.com'
smtp_port = 25
sender = 'alert@example.com'
receivers = ['admin@example.com']
username = 'alert@example.com'
password = 'password'
# 创建邮件内容
message = MIMEText(content, 'plain', 'utf-8')
message['From'] = Header('OceanBase监控系统', 'utf-8')
message['To'] = Header('运维团队', 'utf-8')
message['Subject'] = Header(subject, 'utf-8')
# 发送邮件
try:
smtp_obj = smtplib.SMTP(smtp_server, smtp_port)
smtp_obj.login(username, password)
smtp_obj.sendmail(sender, receivers, message.as_string())
print(f"[{time.strftime('%Y-%m-%d %H:%M:%S')}] 告警邮件发送成功")
except smtplib.SMTPException as e:
print(f"[{time.strftime('%Y-%m-%d %H:%M:%S')}] 告警邮件发送失败: {e}")
# 检查集群告警
def check_cluster_alerts():
conn = connect_ob()
alerts = []
try:
with conn.cursor() as cursor:
# 检查节点状态
cursor.execute('''
SELECT
svr_ip,
svr_port,
zone,
status
FROM oceanbase.GV$OB_SERVER
WHERE status != 'ACTIVE'
''')
inactive_servers = cursor.fetchall()
for server in inactive_servers:
alerts.append(f"节点异常: {server['svr_ip']}:{server['svr_port']} (可用区: {server['zone']}) 状态: {server['status']}")
# 检查CPU使用率
cursor.execute('''
SELECT
svr_ip,
svr_port,
zone,
cpu_total,
cpu_assigned
FROM oceanbase.GV$OB_SERVER
WHERE cpu_assigned / cpu_total > 0.8
''')
high_cpu_servers = cursor.fetchall()
for server in high_cpu_servers:
cpu_usage = (server['cpu_assigned'] / server['cpu_total']) * 100
alerts.append(f"CPU使用率过高: {server['svr_ip']}:{server['svr_port']} (可用区: {server['zone']}) 使用率: {cpu_usage:.2f}%")
# 检查慢SQL数量
cursor.execute('''
SELECT
s.svr_ip,
s.svr_port,
t.tenant_name,
COUNT(*) as slow_sql_count
FROM oceanbase.GV$OB_SLOW_SQL s
JOIN oceanbase.DBA_OB_TENANTS t ON s.tenant_id = t.tenant_id
WHERE s.start_time >= NOW() - INTERVAL '5' MINUTE
GROUP BY s.svr_ip, s.svr_port, t.tenant_name
HAVING slow_sql_count > 10
''')
high_slow_sql = cursor.fetchall()
for item in high_slow_sql:
alerts.append(f"慢SQL数量过多: {item['svr_ip']}:{item['svr_port']} (租户: {item['tenant_name']}) 5分钟内慢SQL数量: {item['slow_sql_count']}")
finally:
conn.close()
return alerts
# 主程序
if __name__ == "__main__":
alerts = check_cluster_alerts()
if alerts:
subject = "OceanBase集群告警"
content = f"[{time.strftime('%Y-%m-%d %H:%M:%S')}] OceanBase集群出现异常情况:\n\n" + "\n".join(alerts)
send_alert(subject, content)
else:
print(f"[{time.strftime('%Y-%m-%d %H:%M:%S')}] 集群运行正常,无告警")4. 日志分析脚本
功能:分析OceanBase日志文件 适用场景:
- 日志内容分析
- 异常日志检测
- 日志统计和报告
Python脚本示例:
python
import re
import os
import time
from collections import defaultdict
# 分析OceanBase日志
def analyze_ob_log(log_file):
# 定义日志模式
error_pattern = re.compile(r'ERROR|FATAL|CRITICAL')
warning_pattern = re.compile(r'WARN|WARNING')
slow_sql_pattern = re.compile(r'slow query')
# 初始化统计数据
stats = {
'error_count': 0,
'warning_count': 0,
'slow_sql_count': 0,
'error_types': defaultdict(int),
'warning_types': defaultdict(int)
}
# 打开日志文件
with open(log_file, 'r') as f:
for line in f:
# 统计错误日志
if error_pattern.search(line):
stats['error_count'] += 1
# 提取错误类型
error_type = line.split(':')[2].strip() if len(line.split(':')) > 2 else 'Unknown'
stats['error_types'][error_type] += 1
# 统计警告日志
if warning_pattern.search(line):
stats['warning_count'] += 1
# 提取警告类型
warning_type = line.split(':')[2].strip() if len(line.split(':')) > 2 else 'Unknown'
stats['warning_types'][warning_type] += 1
# 统计慢SQL
if slow_sql_pattern.search(line):
stats['slow_sql_count'] += 1
return stats
# 生成日志分析报告
def generate_log_report(stats, log_file):
report = f"[{time.strftime('%Y-%m-%d %H:%M:%S')}] OceanBase日志分析报告\n"
report += f"日志文件: {log_file}\n"
report += "=" * 80 + "\n"
report += f"错误日志数量: {stats['error_count']}\n"
report += f"警告日志数量: {stats['warning_count']}\n"
report += f"慢SQL数量: {stats['slow_sql_count']}\n"
if stats['error_types']:
report += "\n错误类型统计:\n"
report += "-" * 80 + "\n"
for error_type, count in sorted(stats['error_types'].items(), key=lambda x: x[1], reverse=True):
report += f"{error_type}: {count}\n"
if stats['warning_types']:
report += "\n警告类型统计:\n"
report += "-" * 80 + "\n"
for warning_type, count in sorted(stats['warning_types'].items(), key=lambda x: x[1], reverse=True):
report += f"{warning_type}: {count}\n"
report += "=" * 80 + "\n"
return report
# 主程序
if __name__ == "__main__":
log_file = '/data/ob/log/observer.log'
if os.path.exists(log_file):
stats = analyze_ob_log(log_file)
report = generate_log_report(stats, log_file)
print(report)
# 保存报告到文件
report_file = f"/tmp/ob_log_analysis_{time.strftime('%Y%m%d%H%M%S')}.txt"
with open(report_file, 'w') as f:
f.write(report)
print(f"报告已保存到: {report_file}")
else:
print(f"日志文件不存在: {log_file}")脚本部署和管理
1. 脚本部署方式
功能:部署监控脚本到生产环境 适用场景:
- 监控脚本上线
- 脚本更新和维护
- 脚本批量部署
部署方式:
1.1 定时任务部署
bash
# 使用crontab部署定时执行的脚本
crontab -e
# 添加以下内容,每5分钟执行一次集群状态监控脚本
*/5 * * * * python /path/to/monitor_cluster_status.py >> /var/log/ob_monitor.log 2>&1
# 每30分钟执行一次性能指标采集脚本
*/30 * * * * python /path/to/collect_performance_metrics.py >> /var/log/ob_performance.log 2>&1
# 每小时执行一次日志分析脚本
0 * * * * python /path/to/analyze_ob_log.py >> /var/log/ob_log_analysis.log 2>&11.2 系统服务部署
bash
# 创建systemd服务文件
cat > /etc/systemd/system/ob-monitor.service << EOF
[Unit]
Description=OceanBase Monitoring Service
After=network.target
[Service]
Type=simple
User=root
ExecStart=/usr/bin/python /path/to/collect_performance_metrics.py
Restart=on-failure
RestartSec=5
[Install]
WantedBy=multi-user.target
EOF
# 启动服务并设置开机自启
systemctl daemon-reload
systemctl start ob-monitor
systemctl enable ob-monitor2. 脚本监控和维护
功能:监控和维护已部署的监控脚本 适用场景:
- 脚本运行状态监控
- 脚本日志管理
- 脚本性能优化
监控方法:
bash
# 检查脚本进程状态
ps aux | grep python | grep ob_monitor
# 查看脚本日志
tail -f /var/log/ob_monitor.log
# 检查脚本执行结果
cat /tmp/ob_cluster_status.txt
# 监控脚本资源使用情况
top -p $(pgrep -f collect_performance_metrics.py)维护要点:
- 定期检查脚本运行状态
- 及时清理和归档脚本日志
- 根据业务需求调整脚本执行频率
- 定期更新脚本,适应OceanBase版本变化
- 监控脚本本身的性能,避免对系统造成过大负担
最佳实践
1. 脚本设计原则
- 模块化设计:将脚本拆分为多个功能模块,便于维护和扩展
- 错误处理:完善的错误处理机制,确保脚本在异常情况下能够正常退出
- 日志记录:详细的日志记录,便于问题定位和分析
- 配置分离:将配置信息与代码分离,便于不同环境的部署
- 性能优化:优化脚本性能,减少对系统资源的占用
- 安全考虑:保护敏感信息,如数据库密码等
2. 脚本执行频率
| 脚本类型 | 建议执行频率 | 说明 |
|---|---|---|
| 集群状态监控 | 1-5分钟 | 实时掌握集群状态 |
| 性能指标采集 | 5-30分钟 | 平衡实时性和系统开销 |
| 告警脚本 | 1-5分钟 | 及时发现和处理异常 |
| 日志分析脚本 | 1小时-1天 | 避免频繁读取大文件 |
| 数据备份脚本 | 根据业务需求 | 通常为每天或每周 |
3. 脚本安全管理
- 权限控制:限制脚本的执行权限,仅允许特定用户执行
- 密码管理:避免在脚本中硬编码密码,使用环境变量或配置文件加密存储
- 网络安全:使用安全的连接方式,如SSL/TLS加密
- 代码审计:定期审计脚本代码,发现和修复安全漏洞
- 版本控制:使用版本控制系统管理脚本代码,便于追溯和回滚
4. 脚本自动化集成
- 与监控系统集成:将脚本与Prometheus、Grafana等监控系统集成,实现可视化监控
- 与告警系统集成:将脚本与Zabbix、Nagios等告警系统集成,实现自动化告警
- 与自动化平台集成:将脚本与Ansible、SaltStack等自动化平台集成,实现批量部署和管理
- 与CI/CD流水线集成:将脚本纳入CI/CD流水线,实现自动化测试和部署
常见问题(FAQ)
Q1: 如何确保监控脚本的可靠性?
A1: 确保监控脚本可靠性的方法:
- 完善的错误处理机制,包括异常捕获和日志记录
- 定期测试脚本,确保在各种情况下都能正常运行
- 实现脚本自身的监控,如心跳检测等
- 配置脚本的自动重启机制,如使用systemd的Restart选项
- 定期检查脚本日志,及时发现和解决问题
Q2: 监控脚本对OceanBase集群性能有影响吗?
A2: 监控脚本对OceanBase集群性能的影响:
- 合理设计的监控脚本对集群性能影响很小
- 可以通过调整脚本执行频率和采样率来控制影响
- 避免在高峰时段执行资源密集型脚本
- 优化脚本代码,减少对数据库的查询次数和复杂度
- 使用只读用户执行监控脚本,避免对数据库造成写操作
Q3: 如何处理监控脚本的并发执行问题?
A3: 处理监控脚本并发执行问题的方法:
- 避免多个脚本同时执行相同的操作
- 使用锁机制控制脚本的并发执行
- 合理规划脚本的执行时间,避免冲突
- 使用分布式调度系统,如Airflow、Celery等,统一管理脚本执行
Q4: 监控脚本如何适应OceanBase版本变化?
A4: 监控脚本适应OceanBase版本变化的方法:
- 定期更新脚本,适配新的系统视图和参数
- 使用版本兼容的API和查询语句
- 实现脚本的版本检测和自适应机制
- 建立脚本的测试环境,在新版本上线前进行测试
Q5: 如何提高监控脚本的可维护性?
A5: 提高监控脚本可维护性的方法:
- 采用模块化设计,将功能拆分为独立的模块
- 编写详细的注释和文档
- 使用统一的代码风格和命名规范
- 实现配置与代码分离,便于不同环境的部署
- 使用版本控制系统管理脚本代码
Q6: 监控脚本如何处理大量数据?
A6: 监控脚本处理大量数据的方法:
- 采用增量采集和处理方式,减少数据量
- 使用采样机制,只采集部分数据
- 实现数据压缩和归档,减少存储空间
- 使用分布式处理框架,如Spark、Flink等,处理大规模数据
- 优化数据存储结构,提高查询效率
Q7: 如何实现监控脚本的告警分级?
A7: 实现监控脚本告警分级的方法:
- 根据告警的严重程度,将告警分为不同级别(如紧急、重要、警告、信息)
- 为不同级别的告警配置不同的通知方式和接收人
- 实现告警的抑制和聚合,避免告警风暴
- 建立告警的升级机制,长时间未处理的告警自动升级
Q8: 监控脚本如何与云平台集成?
A8: 监控脚本与云平台集成的方法:
- 使用云平台提供的SDK和API,实现与云监控系统的集成
- 将监控数据发送到云平台的时序数据库,如阿里云TSDB、AWS CloudWatch等
- 使用云平台提供的函数计算服务,实现无服务器的脚本执行
- 与云平台的自动化运维工具集成,实现自动化故障处理
Q9: 如何实现监控脚本的可视化?
A9: 实现监控脚本可视化的方法:
- 将监控数据发送到Prometheus,使用Grafana进行可视化展示
- 使用Python的可视化库,如Matplotlib、Seaborn等,生成图表和报告
- 与Web框架集成,开发自定义的监控仪表盘
- 使用开源的监控可视化工具,如Kibana、Datadog等
Q10: 监控脚本如何处理跨地域集群的监控?
A10: 处理跨地域集群监控的方法:
- 部署分布式监控架构,在每个地域部署监控脚本
- 使用统一的监控数据采集和存储平台
- 实现监控数据的跨地域同步和聚合
- 考虑网络延迟和带宽限制,优化数据传输
- 为不同地域的集群配置独立的告警规则和通知方式
