Oracle11g一键巡检脚本(输出HTML格式)

一、Python脚本(完整代码)

import subprocess import os import socket import re import glob import cx_Oracle import argparse from datetime import datetime  def parse_args():     """解析命令行参数,支持灵活配置巡检参数"""     parser = argparse.ArgumentParser(description="Oracle数据库与服务器巡检脚本(Python版)")     parser.add_argument("--db-host", required=True, help="数据库主机地址(如127.0.0.1)")     parser.add_argument("--db-port", default=1521, type=int, help="数据库端口(默认1521)")     parser.add_argument("--db-sid", required=True, help="Oracle SID(如orcl11g)")     parser.add_argument("--db-user", default="sys", help="数据库用户名(默认sys)")     parser.add_argument("--db-pwd", required=True, help="数据库密码")     parser.add_argument("--html-output", default="./oracle_Healthcheck_{}.html", help="HTML报告输出路径(默认当前目录)")     return parser.parse_args()  def get_server_info():     """获取服务器层面信息:主机名、OS版本、存储、内存、CPU"""     server_info = {}      # 1. 主机名     server_info["hostname"] = socket.gethostname()      # 2. 操作系统版本(读取/etc/os-release)     try:         with open("/etc/os-release", "r", encoding="utf-8") as f:             for line in f:                 if line.startswith("PRETTY_NAME="):                     server_info["os_version"] = line.strip().split('"')[1]                     break     except Exception as e:         server_info["os_version"] = f"获取失败:{str(e)}"      # 3. 根目录存储使用情况(df -h /)     try:         df_result = subprocess.check_output(["df", "-h", "/"], stderr=subprocess.STDOUT, text=True)         # 提取关键行(排除标题行)         df_line = [line for line in df_result.splitlines() if line.endswith("/")][0]         df_parts = df_line.split()         server_info["storage"] = f"Disk Usage: {df_parts[2]}/{df_parts[1]} ({df_parts[4]})"     except Exception as e:         server_info["storage"] = f"获取失败:{str(e)}"      # 4. 内存使用情况(free -h)     try:         free_result = subprocess.check_output(["free", "-h"], stderr=subprocess.STDOUT, text=True)         free_line = free_result.splitlines()[1]  # 第二行为内存详情         free_parts = free_line.split()         total_mem = free_parts[1]         used_mem = free_parts[2]         # 修正原bash脚本的计算错误(原$3*0.1/$2逻辑错误,改为实际使用率)         used_percent = (float(free_parts[2].replace("G", "")) / float(total_mem.replace("G", ""))) * 100         server_info["memory"] = f"Total Memory: {total_mem}, Used Memory: {used_mem}, Memory Usage: {used_percent:.2f}%"     except Exception as e:         server_info["memory"] = f"获取失败:{str(e)}"      # 5. CPU使用率(top -bn1 提取用户+系统CPU)     try:         top_result = subprocess.check_output(["top", "-bn1"], stderr=subprocess.STDOUT, text=True)         cpu_line = [line for line in top_result.splitlines() if line.startswith("Cpu(s)")][0]         cpu_parts = re.findall(r"d+.d+", cpu_line)         user_cpu = float(cpu_parts[0])         sys_cpu = float(cpu_parts[2])         server_info["cpu"] = f"{user_cpu + sys_cpu:.1f}%"     except Exception as e:         server_info["cpu"] = f"获取失败:{str(e)}"      return server_info  def get_db_info(db_host, db_port, db_sid, db_user, db_pwd):     """获取数据库层面信息,返回字典格式"""     db_info = {}     dsn = cx_Oracle.makedsn(db_host, db_port, sid=db_sid)     conn = None      try:         # 以SYSDBA权限连接数据库         conn = cx_Oracle.connect(user=db_user, password=db_pwd, dsn=dsn, mode=cx_Oracle.SYSDBA)         cursor = conn.cursor()          # 1. 数据库异常日志(最后200行含ERROR的内容)         log_path = f"/u01/app/oracle/diag/rdbms/{db_sid}/{db_sid}/trace/*.log"         error_logs = []         for log_file in glob.glob(log_path):             try:                 # 读取文件最后200行                 with open(log_file, "r", encoding="utf-8", errors="ignore") as f:                     lines = f.readlines()[-200:]                     # 筛选含ERROR/WARNING的行                     for line in lines:                         if re.search(r"error|warning", line, re.IGNORECASE):                             error_logs.append(f"[{os.path.basename(log_file)}] {line.strip()}")             except Exception as e:                 error_logs.append(f"读取{os.path.basename(log_file)}失败:{str(e)}")         db_info["error_logs"] = "n".join(error_logs) if error_logs else "无报错信息"          # 2. 数据库备份情况(近1天的备份)         backup_sql = """             SELECT fname backup_file_name, status, device_type, completion_time backup_finish_time             FROM v$backup_files             WHERE file_type = 'PIECE' AND bs_completion_time > SYSDATE - 1         """         cursor.execute(backup_sql)         backup_rows = cursor.fetchall()         backup_header = "BACKUP_FILE_NAME | STATUS | DEVICE_TYPE | BACKUP_FINISH_TIMEn"         backup_header += "-" * 80 + "n"         backup_content = backup_header + "n".join([f"{row[0]} | {row[1]} | {row[2]} | {row[3]}" for row in backup_rows])         db_info["backups"] = backup_content if backup_rows else "近1天无备份记录"          # 3. 异常计划任务(近2天未成功的任务)         task_sql = """             SELECT job_name, job_status, job_start_time             FROM dba_autotask_job_history             WHERE job_start_time > SYSDATE - 2 AND job_status != 'SUCCEEDED'         """         cursor.execute(task_sql)         task_rows = cursor.fetchall()         task_content = "n".join([f"任务名:{row[0]} | 状态:{row[1]} | 开始时间:{row[2]}" for row in task_rows])         db_info["abnormal_tasks"] = task_content if task_rows else "近2天无异常任务"          # 4. 数据库活动会话         session_sql = """             SELECT vs.username, COUNT(*) active_session             FROM v$session vs             WHERE vs.status = 'ACTIVE'             GROUP BY vs.username         """         cursor.execute(session_sql)         session_rows = cursor.fetchall()         session_content = "n".join([f"用户名:{row[0]} | 活动会话数:{row[1]}" for row in session_rows])         db_info["active_sessions"] = session_content if session_rows else "无活动会话"          # 5. 近1天归档日志数量         archive_sql = "SELECT COUNT(*) FROM v$archived_log WHERE first_time > SYSDATE - 1"         cursor.execute(archive_sql)         db_info["archive_count"] = str(cursor.fetchone()[0])          # 6. 表空间使用率         tablespace_sql = """             SELECT df.tablespace_name, ROUND((df.bytes - NVL(free.bytes, 0))/df.bytes*100, 2) used_percent             FROM (SELECT tablespace_name, SUM(bytes) bytes FROM dba_data_files GROUP BY tablespace_name) df             LEFT JOIN (SELECT tablespace_name, SUM(bytes) bytes FROM dba_free_space GROUP BY tablespace_name) free             ON df.tablespace_name = free.tablespace_name         """         cursor.execute(tablespace_sql)         ts_rows = cursor.fetchall()         ts_content = "n".join([f"表空间:{row[0]} | 使用率:{row[1]}%" for row in ts_rows])         db_info["tablespace_usage"] = ts_content          # 7. 异常状态用户(近30天锁定/7天内过期)         user_sql = """             SELECT username, lock_date, expiry_date             FROM dba_users             WHERE account_status != 'OPEN'               AND created >= (SELECT TRUNC(created) FROM dba_users WHERE username = 'SYS') + 0.99999               AND (lock_date >= SYSDATE - 30 OR (expiry_date BETWEEN SYSDATE -7 AND SYSDATE +7))             ORDER BY created         """         cursor.execute(user_sql)         user_rows = cursor.fetchall()         user_content = "n".join([f"用户名:{row[0]} | 锁定时间:{row[1]} | 过期时间:{row[2]}" for row in user_rows])         db_info["abnormal_users"] = user_content if user_rows else "无异常状态用户"          # 8. 归档日志开启状态(log_archive_start参数)         archive_status_sql = "SELECT value FROM v$parameter WHERE name = 'log_archive_start'"         cursor.execute(archive_status_sql)         db_info["archive_status"] = cursor.fetchone()[0].strip()          # 9. 闪回区大小(转换为GB,增强可读性)         flashback_sql = "SELECT value FROM v$parameter WHERE name = 'db_recovery_file_dest_size'"         cursor.execute(flashback_sql)         flashback_bytes = int(cursor.fetchone()[0])         flashback_gb = flashback_bytes / (1024 ** 3)  # 字节转GB         db_info["flashback_size"] = f"{flashback_gb:.2f} GB"          # 10. 锁表数量         lock_sql = "SELECT COUNT(*) FROM v$lock GROUP BY DECODE(request, 0, 'held', 'waiting')"         cursor.execute(lock_sql)         lock_count = len(cursor.fetchall())         db_info["lock_count"] = str(lock_count)      except cx_Oracle.Error as e:         # 捕获Oracle数据库错误         db_info["db_error"] = f"数据库连接/查询失败:{str(e)}"     except Exception as e:         db_info["db_error"] = f"未知错误:{str(e)}"     finally:         if conn:             conn.close()      return db_info  def generate_html_report(server_info, db_info, db_sid, output_path):     """生成HTML巡检报告"""     current_time = datetime.now().strftime("%Y-%m-%d %H:%M:%S")     html_content = f""" <html lang="zh-CN"> <head>     <meta charset="UTF-8">     <title>Oracle 健康巡检报告 - {db_sid}</title>     <style>         body {{ font-family: Arial, sans-serif; margin: 20px; }}         h1, h2 {{ color: #333; border-bottom: 1px solid #ddd; padding-bottom: 5px; }}         table {{ width: 100%; border-collapse: collapse; margin: 15px 0; }}         th, td {{ border: 1px solid #ddd; padding: 8px; text-align: left; }}         th {{ background-color: #f5f5f5; }}         pre {{ background-color: #f8f8f8; padding: 10px; border-radius: 4px; overflow-x: auto; }}     </style> </head> <body>     <h1>XXXX项目</h1>     <h2>Oracle 健康巡检报告 - {db_sid}</h2>     <p>巡检时间:{current_time} &nbsp;&nbsp;&nbsp; 巡检人员:zhh</p>      <!-- 服务器层面巡检 -->     <h2>一、服务器层面巡检</h2>     <table>         <tr><th>巡检项</th><th>结果</th></tr>         <tr><td>主机名</td><td>{server_info['hostname']}</td></tr>         <tr><td>操作系统版本</td><td>{server_info['os_version']}</td></tr>         <tr><td>根目录存储使用情况</td><td>{server_info['storage']}</td></tr>         <tr><td>内存使用情况</td><td>{server_info['memory']}</td></tr>         <tr><td>CPU使用率</td><td>{server_info['cpu']}</td></tr>     </table>      <!-- 数据库层面巡检 -->     <h2>二、数据库层面巡检({db_sid})</h2>     <table>         <tr><th>巡检项</th><th>查询逻辑</th><th>结果</th></tr> """      # 数据库巡检项拼接(处理可能的连接错误)     if "db_error" in db_info:         html_content += f"""         <tr><td colspan="3" style="color: red;">{db_info['db_error']}</td></tr> """     else:         db_items = [             ("数据库异常日志", "tail 日志文件最后200行 + 筛选ERROR/WARNING", f"<pre>{db_info['error_logs']}</pre>"),             ("近1天备份情况", "查询v$backup_files", f"<pre>{db_info['backups']}</pre>"),             ("近2天异常计划任务", "查询dba_autotask_job_history", f"<pre>{db_info['abnormal_tasks']}</pre>"),             ("活动会话", "查询v$session(STATUS='ACTIVE')", f"<pre>{db_info['active_sessions']}</pre>"),             ("近1天归档日志数量", "查询v$archived_log", db_info['archive_count']),             ("表空间使用率", "dba_data_files + dba_free_space计算", f"<pre>{db_info['tablespace_usage']}</pre>"),             ("异常状态用户", "查询dba_users(锁定/过期)", f"<pre>{db_info['abnormal_users']}</pre>"),             ("归档日志开启状态", "查询v$parameter(log_archive_start)", db_info['archive_status']),             ("闪回区大小", "查询v$parameter(db_recovery_file_dest_size)", db_info['flashback_size']),             ("锁表分组数量", "查询v$lock分组统计", db_info['lock_count'])         ]         for item_name, logic, result in db_items:             html_content += f"""         <tr><td>{item_name}</td><td>{logic}</td><td>{result}</td></tr> """      # HTML尾部     html_content += """     </table> </body> </html> """      # 写入HTML文件     final_output = output_path.format(db_sid)     with open(final_output, "w", encoding="utf-8") as f:         f.write(html_content)     print(f"巡检报告已生成:{final_output}")  def main():     # 解析参数     args = parse_args()     # 设置Oracle客户端环境(根据实际环境调整,若已配置可注释)     os.environ["LD_LIBRARY_PATH"] = "/u01/app/oracle/product/11.2.0/db_1/lib"  # 示例路径      # 1. 获取服务器信息     print("正在获取服务器信息...")     server_info = get_server_info()      # 2. 获取数据库信息     print("正在获取数据库信息...")     db_info = get_db_info(         db_host=args.db_host,         db_port=args.db_port,         db_sid=args.db_sid,         db_user=args.db_user,         db_pwd=args.db_pwd     )      # 3. 生成HTML报告     print("正在生成HTML报告...")     generate_html_report(server_info, db_info, args.db_sid, args.html_output)     print("巡检完成!")  if __name__ == "__main__":     main() 

二、使用前准备

  1. 安装依赖库

    # 安装cx_Oracle(用于Oracle数据库连接) pip install cx_Oracle 
  2. 配置Oracle客户端

    • 若服务器未安装Oracle数据库,需单独安装Oracle Instant Client(轻量级客户端)。
    • 设置环境变量LD_LIBRARY_PATH指向客户端库路径(脚本中已包含示例,需根据实际路径调整)。
  3. 权限要求

    • 执行脚本的用户需具备:
      • 读取服务器文件权限(/etc/os-release、Oracle日志文件)。
      • 执行系统命令权限(dffreetop)。
      • Oracle数据库SYSDBA权限(用于查询系统视图)。

三、执行命令示例

python oracle_healthcheck.py    --db-host 192.168.1.100    --db-port 1521    --db-sid orcl11g    --db-user sys    --db-pwd YourSysPassword    --html-output ./oracle_report_{}.html 

发表评论

评论已关闭。

相关文章