一、Python脚本(完整代码)
import subprocess import os import socket import re import glob import cx_Oracle import argparse from datetime import datetime def parse_args(): """解析命令行参数,支持灵活配置巡检参数""" parser = argparse.ArgumentParser(description="Oracle数据库与服务器巡检脚本(Python版)") parser.add_argument("--db-host", required=True, help="数据库主机地址(如127.0.0.1)") parser.add_argument("--db-port", default=1521, type=int, help="数据库端口(默认1521)") parser.add_argument("--db-sid", required=True, help="Oracle SID(如orcl11g)") parser.add_argument("--db-user", default="sys", help="数据库用户名(默认sys)") parser.add_argument("--db-pwd", required=True, help="数据库密码") parser.add_argument("--html-output", default="./oracle_Healthcheck_{}.html", help="HTML报告输出路径(默认当前目录)") return parser.parse_args() def get_server_info(): """获取服务器层面信息:主机名、OS版本、存储、内存、CPU""" server_info = {} # 1. 主机名 server_info["hostname"] = socket.gethostname() # 2. 操作系统版本(读取/etc/os-release) try: with open("/etc/os-release", "r", encoding="utf-8") as f: for line in f: if line.startswith("PRETTY_NAME="): server_info["os_version"] = line.strip().split('"')[1] break except Exception as e: server_info["os_version"] = f"获取失败:{str(e)}" # 3. 根目录存储使用情况(df -h /) try: df_result = subprocess.check_output(["df", "-h", "/"], stderr=subprocess.STDOUT, text=True) # 提取关键行(排除标题行) df_line = [line for line in df_result.splitlines() if line.endswith("/")][0] df_parts = df_line.split() server_info["storage"] = f"Disk Usage: {df_parts[2]}/{df_parts[1]} ({df_parts[4]})" except Exception as e: server_info["storage"] = f"获取失败:{str(e)}" # 4. 内存使用情况(free -h) try: free_result = subprocess.check_output(["free", "-h"], stderr=subprocess.STDOUT, text=True) free_line = free_result.splitlines()[1] # 第二行为内存详情 free_parts = free_line.split() total_mem = free_parts[1] used_mem = free_parts[2] # 修正原bash脚本的计算错误(原$3*0.1/$2逻辑错误,改为实际使用率) used_percent = (float(free_parts[2].replace("G", "")) / float(total_mem.replace("G", ""))) * 100 server_info["memory"] = f"Total Memory: {total_mem}, Used Memory: {used_mem}, Memory Usage: {used_percent:.2f}%" except Exception as e: server_info["memory"] = f"获取失败:{str(e)}" # 5. CPU使用率(top -bn1 提取用户+系统CPU) try: top_result = subprocess.check_output(["top", "-bn1"], stderr=subprocess.STDOUT, text=True) cpu_line = [line for line in top_result.splitlines() if line.startswith("Cpu(s)")][0] cpu_parts = re.findall(r"d+.d+", cpu_line) user_cpu = float(cpu_parts[0]) sys_cpu = float(cpu_parts[2]) server_info["cpu"] = f"{user_cpu + sys_cpu:.1f}%" except Exception as e: server_info["cpu"] = f"获取失败:{str(e)}" return server_info def get_db_info(db_host, db_port, db_sid, db_user, db_pwd): """获取数据库层面信息,返回字典格式""" db_info = {} dsn = cx_Oracle.makedsn(db_host, db_port, sid=db_sid) conn = None try: # 以SYSDBA权限连接数据库 conn = cx_Oracle.connect(user=db_user, password=db_pwd, dsn=dsn, mode=cx_Oracle.SYSDBA) cursor = conn.cursor() # 1. 数据库异常日志(最后200行含ERROR的内容) log_path = f"/u01/app/oracle/diag/rdbms/{db_sid}/{db_sid}/trace/*.log" error_logs = [] for log_file in glob.glob(log_path): try: # 读取文件最后200行 with open(log_file, "r", encoding="utf-8", errors="ignore") as f: lines = f.readlines()[-200:] # 筛选含ERROR/WARNING的行 for line in lines: if re.search(r"error|warning", line, re.IGNORECASE): error_logs.append(f"[{os.path.basename(log_file)}] {line.strip()}") except Exception as e: error_logs.append(f"读取{os.path.basename(log_file)}失败:{str(e)}") db_info["error_logs"] = "n".join(error_logs) if error_logs else "无报错信息" # 2. 数据库备份情况(近1天的备份) backup_sql = """ SELECT fname backup_file_name, status, device_type, completion_time backup_finish_time FROM v$backup_files WHERE file_type = 'PIECE' AND bs_completion_time > SYSDATE - 1 """ cursor.execute(backup_sql) backup_rows = cursor.fetchall() backup_header = "BACKUP_FILE_NAME | STATUS | DEVICE_TYPE | BACKUP_FINISH_TIMEn" backup_header += "-" * 80 + "n" backup_content = backup_header + "n".join([f"{row[0]} | {row[1]} | {row[2]} | {row[3]}" for row in backup_rows]) db_info["backups"] = backup_content if backup_rows else "近1天无备份记录" # 3. 异常计划任务(近2天未成功的任务) task_sql = """ SELECT job_name, job_status, job_start_time FROM dba_autotask_job_history WHERE job_start_time > SYSDATE - 2 AND job_status != 'SUCCEEDED' """ cursor.execute(task_sql) task_rows = cursor.fetchall() task_content = "n".join([f"任务名:{row[0]} | 状态:{row[1]} | 开始时间:{row[2]}" for row in task_rows]) db_info["abnormal_tasks"] = task_content if task_rows else "近2天无异常任务" # 4. 数据库活动会话 session_sql = """ SELECT vs.username, COUNT(*) active_session FROM v$session vs WHERE vs.status = 'ACTIVE' GROUP BY vs.username """ cursor.execute(session_sql) session_rows = cursor.fetchall() session_content = "n".join([f"用户名:{row[0]} | 活动会话数:{row[1]}" for row in session_rows]) db_info["active_sessions"] = session_content if session_rows else "无活动会话" # 5. 近1天归档日志数量 archive_sql = "SELECT COUNT(*) FROM v$archived_log WHERE first_time > SYSDATE - 1" cursor.execute(archive_sql) db_info["archive_count"] = str(cursor.fetchone()[0]) # 6. 表空间使用率 tablespace_sql = """ SELECT df.tablespace_name, ROUND((df.bytes - NVL(free.bytes, 0))/df.bytes*100, 2) used_percent FROM (SELECT tablespace_name, SUM(bytes) bytes FROM dba_data_files GROUP BY tablespace_name) df LEFT JOIN (SELECT tablespace_name, SUM(bytes) bytes FROM dba_free_space GROUP BY tablespace_name) free ON df.tablespace_name = free.tablespace_name """ cursor.execute(tablespace_sql) ts_rows = cursor.fetchall() ts_content = "n".join([f"表空间:{row[0]} | 使用率:{row[1]}%" for row in ts_rows]) db_info["tablespace_usage"] = ts_content # 7. 异常状态用户(近30天锁定/7天内过期) user_sql = """ SELECT username, lock_date, expiry_date FROM dba_users WHERE account_status != 'OPEN' AND created >= (SELECT TRUNC(created) FROM dba_users WHERE username = 'SYS') + 0.99999 AND (lock_date >= SYSDATE - 30 OR (expiry_date BETWEEN SYSDATE -7 AND SYSDATE +7)) ORDER BY created """ cursor.execute(user_sql) user_rows = cursor.fetchall() user_content = "n".join([f"用户名:{row[0]} | 锁定时间:{row[1]} | 过期时间:{row[2]}" for row in user_rows]) db_info["abnormal_users"] = user_content if user_rows else "无异常状态用户" # 8. 归档日志开启状态(log_archive_start参数) archive_status_sql = "SELECT value FROM v$parameter WHERE name = 'log_archive_start'" cursor.execute(archive_status_sql) db_info["archive_status"] = cursor.fetchone()[0].strip() # 9. 闪回区大小(转换为GB,增强可读性) flashback_sql = "SELECT value FROM v$parameter WHERE name = 'db_recovery_file_dest_size'" cursor.execute(flashback_sql) flashback_bytes = int(cursor.fetchone()[0]) flashback_gb = flashback_bytes / (1024 ** 3) # 字节转GB db_info["flashback_size"] = f"{flashback_gb:.2f} GB" # 10. 锁表数量 lock_sql = "SELECT COUNT(*) FROM v$lock GROUP BY DECODE(request, 0, 'held', 'waiting')" cursor.execute(lock_sql) lock_count = len(cursor.fetchall()) db_info["lock_count"] = str(lock_count) except cx_Oracle.Error as e: # 捕获Oracle数据库错误 db_info["db_error"] = f"数据库连接/查询失败:{str(e)}" except Exception as e: db_info["db_error"] = f"未知错误:{str(e)}" finally: if conn: conn.close() return db_info def generate_html_report(server_info, db_info, db_sid, output_path): """生成HTML巡检报告""" current_time = datetime.now().strftime("%Y-%m-%d %H:%M:%S") html_content = f""" <html lang="zh-CN"> <head> <meta charset="UTF-8"> <title>Oracle 健康巡检报告 - {db_sid}</title> <style> body {{ font-family: Arial, sans-serif; margin: 20px; }} h1, h2 {{ color: #333; border-bottom: 1px solid #ddd; padding-bottom: 5px; }} table {{ width: 100%; border-collapse: collapse; margin: 15px 0; }} th, td {{ border: 1px solid #ddd; padding: 8px; text-align: left; }} th {{ background-color: #f5f5f5; }} pre {{ background-color: #f8f8f8; padding: 10px; border-radius: 4px; overflow-x: auto; }} </style> </head> <body> <h1>XXXX项目</h1> <h2>Oracle 健康巡检报告 - {db_sid}</h2> <p>巡检时间:{current_time} 巡检人员:zhh</p> <!-- 服务器层面巡检 --> <h2>一、服务器层面巡检</h2> <table> <tr><th>巡检项</th><th>结果</th></tr> <tr><td>主机名</td><td>{server_info['hostname']}</td></tr> <tr><td>操作系统版本</td><td>{server_info['os_version']}</td></tr> <tr><td>根目录存储使用情况</td><td>{server_info['storage']}</td></tr> <tr><td>内存使用情况</td><td>{server_info['memory']}</td></tr> <tr><td>CPU使用率</td><td>{server_info['cpu']}</td></tr> </table> <!-- 数据库层面巡检 --> <h2>二、数据库层面巡检({db_sid})</h2> <table> <tr><th>巡检项</th><th>查询逻辑</th><th>结果</th></tr> """ # 数据库巡检项拼接(处理可能的连接错误) if "db_error" in db_info: html_content += f""" <tr><td colspan="3" style="color: red;">{db_info['db_error']}</td></tr> """ else: db_items = [ ("数据库异常日志", "tail 日志文件最后200行 + 筛选ERROR/WARNING", f"<pre>{db_info['error_logs']}</pre>"), ("近1天备份情况", "查询v$backup_files", f"<pre>{db_info['backups']}</pre>"), ("近2天异常计划任务", "查询dba_autotask_job_history", f"<pre>{db_info['abnormal_tasks']}</pre>"), ("活动会话", "查询v$session(STATUS='ACTIVE')", f"<pre>{db_info['active_sessions']}</pre>"), ("近1天归档日志数量", "查询v$archived_log", db_info['archive_count']), ("表空间使用率", "dba_data_files + dba_free_space计算", f"<pre>{db_info['tablespace_usage']}</pre>"), ("异常状态用户", "查询dba_users(锁定/过期)", f"<pre>{db_info['abnormal_users']}</pre>"), ("归档日志开启状态", "查询v$parameter(log_archive_start)", db_info['archive_status']), ("闪回区大小", "查询v$parameter(db_recovery_file_dest_size)", db_info['flashback_size']), ("锁表分组数量", "查询v$lock分组统计", db_info['lock_count']) ] for item_name, logic, result in db_items: html_content += f""" <tr><td>{item_name}</td><td>{logic}</td><td>{result}</td></tr> """ # HTML尾部 html_content += """ </table> </body> </html> """ # 写入HTML文件 final_output = output_path.format(db_sid) with open(final_output, "w", encoding="utf-8") as f: f.write(html_content) print(f"巡检报告已生成:{final_output}") def main(): # 解析参数 args = parse_args() # 设置Oracle客户端环境(根据实际环境调整,若已配置可注释) os.environ["LD_LIBRARY_PATH"] = "/u01/app/oracle/product/11.2.0/db_1/lib" # 示例路径 # 1. 获取服务器信息 print("正在获取服务器信息...") server_info = get_server_info() # 2. 获取数据库信息 print("正在获取数据库信息...") db_info = get_db_info( db_host=args.db_host, db_port=args.db_port, db_sid=args.db_sid, db_user=args.db_user, db_pwd=args.db_pwd ) # 3. 生成HTML报告 print("正在生成HTML报告...") generate_html_report(server_info, db_info, args.db_sid, args.html_output) print("巡检完成!") if __name__ == "__main__": main()
二、使用前准备
-
安装依赖库
# 安装cx_Oracle(用于Oracle数据库连接) pip install cx_Oracle -
配置Oracle客户端
- 若服务器未安装Oracle数据库,需单独安装Oracle Instant Client(轻量级客户端)。
- 设置环境变量
LD_LIBRARY_PATH指向客户端库路径(脚本中已包含示例,需根据实际路径调整)。
-
权限要求
- 执行脚本的用户需具备:
- 读取服务器文件权限(
/etc/os-release、Oracle日志文件)。 - 执行系统命令权限(
df、free、top)。 - Oracle数据库
SYSDBA权限(用于查询系统视图)。
- 读取服务器文件权限(
- 执行脚本的用户需具备:
三、执行命令示例
python oracle_healthcheck.py --db-host 192.168.1.100 --db-port 1521 --db-sid orcl11g --db-user sys --db-pwd YourSysPassword --html-output ./oracle_report_{}.html