014-Linux 体系安全审计与合规检测平台实现 linux系统的安装步骤
Linux 体系安全审计与合规检测平台实现
随着信息技术的广泛应用, 体系安全 难题日益凸显。特别是在企业和组织的IT基础设施中,Linux 体系因其稳定性和开源特性被广泛采用,但同时也面临着各种安全威胁。建立一个有效的安全审计与合规检测平台,对于保障Linux 体系的安全运行至关重要。本章将详细介绍 怎样从零开始设计和实现一个Linux 体系安全审计平台,帮助管理员及时发现 体系安全隐患并确保 体系符合安全基线标准。
14.1 平台功能介绍
Linux 体系安全审计平台 一个集中式管理工具,主要用于对企业内部Linux服务器进行安全检查、合规性评估和安全策略执行。该平台通过定期扫描、实时监控和综合分析,帮助管理员全面了解 体系安全状况,及时发现并处理潜在安全风险。
核心功能模块
基线检查功能
基线检查是安全审计的基础,用于检查 体系是否符合预定义的安全标准。主要包括:
体系配置检查:检查操作 体系关键配置文件的安全设置,如密码策略、登录策略等 账户安全检查:检查 体系用户账户设置,识别弱密码、特权账户和未授权账户 服务与端口检查:检查 体系开放的服务和端口,识别不必要的服务和可能的安全漏洞 文件权限检查:检查关键 体系文件和目录的权限设置,确保符合最小权限 制度 内核参数检查:检查 体系内核安全参数配置,确保 体系级安全性
漏洞扫描功能
漏洞扫描功能用于发现 体系中存在的安全漏洞,包括:
体系组件漏洞:扫描操作 体系组件的已知漏洞 应用软件漏洞:扫描安装的应用软件的已知漏洞 Web应用漏洞:针对Web服务的特定漏洞扫描 数据库漏洞:针对数据库 体系的安全漏洞扫描 漏洞风险评估:对发现的漏洞进行风险等级评估
合规性检测功能
合规性检测用于确保 体系符合相关的安全标准和法规要求:
行业标准合规:如CIS(互联网安全中心)基准、NIST(美国 民族标准与技术研究院)指南 组织内部安全策略:符合企业自定义的安全策略 法规合规:如GDPR、HIPAA等法规要求的技术控制 合规报告生成:生成详细的合规性报告,显示符合和不符合项
安全事件监控功能
实时监控 体系安全 情形,及时发现安全事件:
日志分析:收集和分析 体系日志,识别可疑活动 文件完整性监控:监控关键 体系文件的变更 异常行为检测:检测 体系中的异常行为和活动 入侵检测:基于 制度和异常的入侵检测 告警机制:配置灵活的告警 制度,通过多种渠道发送告警
安全加固功能
针对发现的安全 难题提供修复和加固措施:
自动修复:对某些安全 难题提供自动修复功能 修复建议:对需要手动干预的 难题提供详细修复指南 批量操作:支持对多个 体系同时执行修复操作 修复验证:修复后自动验证 难题是否已解决
报表与分析功能
提供全面的报表和分析功能,帮助 领会 体系安全状况:
安全评分:基于多种 影响计算 体系安全评分 动向分析:分析安全状况随 时刻的变化 动向 风险热图:可视化显示 体系风险分布 定制报表:支持生成定制化的安全报表 数据导出:支持多种格式的数据导出
14.2 体系架构设计
为了实现上述功能,我们需要设计一个灵活、可扩展且高效的 体系架构。本节将详细介绍Linux 体系安全审计平台的总体架构设计。
总体架构
安全审计平台采用典型的客户端-服务器架构,由 下面内容主要组件组成:
服务器端:
Web应用服务器:提供用户界面和API服务 数据库服务器:存储 体系数据和审计 结局 扫描引擎:执行各类安全扫描和检测任务 任务调度器:管理和分配扫描任务 告警服务:处理和分发各类安全告警
客户端代理:
安装在被监控的Linux服务器上 负责收集本地数据和执行安全检查 实时监控 体系 情形和安全事件 与服务器端保持通信,报告安全状况
用户访问层:
Web管理界面:基于浏览器的管理控制台 命令行工具:用于 高 质量操作和批量管理 移动应用:提供移动设备访问功能 API接口:供第三方 体系集成
技术架构
服务器端技术选择
编程语言:Python 3.8+ Web框架:Django 3.2 LTS 数据库:PostgreSQL 13 (主数据库), Redis 6.0 (缓存和队列) 消息队列:RabbitMQ 3.8 任务调度:Celery 5.0 身份认证:OAuth 2.0 / LDAP 集成 Web服务器:Nginx + Gunicorn 容器化:Docker 20.10+ 和 Docker Compose 监控:Prometheus + Grafana
客户端代理技术选择
编程语言:Python 3.6+ (兼容较旧的Linux 体系) 通信协议:HTTPS RESTful API / gRPC 数据收集:基于 体系原生工具和定制脚本 本地数据库:SQLite (用于离线操作) 加密:TLS 1.3 / 证书认证
数据流设计
扫描与检测流程:
服务器下发扫描任务到客户端代理 客户端代理执行本地检查和数据收集 代理将 结局上传至服务器 服务器处理和分析数据 生成报告和告警
实时监控流程:
客户端代理持续监控 体系 情形 检测到异常时立即上报 服务器分析异常情况 触发告警机制
修复与加固流程:
用户从界面选择修复操作 服务器生成修复任务 客户端代理执行修复操作 代理上报修复 结局 服务器验证修复成功
扩展性与容错设计
水平扩展:
服务器组件可独立扩展 使用负载均衡分散请求
高可用性:
关键服务冗余部署 自动故障转移机制
数据安全:
数据库定期备份 敏感数据加密存储
离线 职业:
客户端支持离线执行检查 重连后自动同步数据
14.3 数据库结构设计
数据库是安全审计平台的核心组成部分,用于存储 体系配置、扫描 结局、安全事件和各类分析数据。本节将详细介绍数据库的结构设计。
14.3.1 数据库分析
在设计数据库之前,我们需要分析 体系的数据需求和访问模式:
数据类型分析:
配置数据: 体系配置、扫描策略、用户设置等 资产数据:被管理的服务器信息、网络拓扑、应用组件等 扫描 结局:基线检查、漏洞扫描、合规性检测的 结局 事件数据:安全事件、告警记录、操作日志等 统计数据:聚合统计信息、历史 动向、评分数据等
数据量与增长预估:
资产数据:增长缓慢,与企业服务器规模相关 扫描 结局:随扫描频率线性增长,需要分表或归档策略 事件数据:可能出现突发增长,需要弹性存储设计 日志数据:持续增长,需要轮转和归档策略
访问模式分析:
写多读少:事件记录、日志数据 读多写少:配置数据、资产信息 读写平衡:扫描 结局、统计数据 批量操作:扫描任务创建、 结局导入
性能需求:
实时监控数据需要快速写入和查询 报表生成可能涉及复杂聚合查询 历史数据查询可以接受较长响应 时刻
14.3.2 数据字典
基于上述分析,我们定义 下面内容主要数据实体和属性:
用户和权限相关表
用户表 (users)
id: 用户ID (主键) username: 用户名 password_hash: 密码哈希 e il: 电子邮箱 full_name: 全名 is_active: 是否激活 is_admin: 是否管理员 last_login: 最后登录 时刻 created_at: 创建 时刻 updated_at: 更新 时刻
角色表 (roles)
id: 角色ID (主键) name: 角色名称 description: 角色描述 created_at: 创建 时刻 updated_at: 更新 时刻
权限表 (permissions)
id: 权限ID (主键) name: 权限名称 code: 权限代码 description: 权限描述 created_at: 创建 时刻
用户角色关联表 (user_roles)
id: 关联ID (主键) user_id: 用户ID (外键) role_id: 角色ID (外键) created_at: 创建 时刻
角色权限关联表 (role_permissions)
id: 关联ID (主键) role_id: 角色ID (外键) permission_id: 权限ID (外键) created_at: 创建 时刻
资产管理相关表
主机表 (hosts)
id: 主机ID (主键) hostname: 主机名 ip_address: IP地址 os_type: 操作 体系类型 os_version: 操作 体系版本 kernel_version: 内核版本 architecture: 体系架构 status: 情形 (在线、离线、维护中) agent_version: 代理版本 last_seen: 最后在线 时刻 created_at: 创建 时刻 updated_at: 更新 时刻
主机组表 (host_groups)
id: 组ID (主键) name: 组名称 description: 组描述 created_at: 创建 时刻 updated_at: 更新 时刻
主机组关联表 (host_group_members)
id: 关联ID (主键) host_id: 主机ID (外键) group_id: 组ID (外键) created_at: 创建 时刻
应用服务表 (services)
id: 服务ID (主键) host_id: 主机ID (外键) name: 服务名称 service_type: 服务类型 version: 版本 status: 情形 port: 端口号 path: 安装路径 created_at: 创建 时刻 updated_at: 更新 时刻
安全检查相关表
安全策略表 (security_policies)
id: 策略ID (主键) name: 策略名称 description: 策略描述 policy_type: 策略类型 (基线、漏洞、合规) is_active: 是否激活 created_by: 创建者ID (外键) created_at: 创建 时刻 updated_at: 更新 时刻
检查项表 (check_items)
id: 检查项ID (主键) policy_id: 策略ID (外键) code: 检查项代码 name: 检查项名称 description: 检查项描述 category: 类别 severity: 严重性 check_type: 检查类型 check_script: 检查脚本/命令 expected_result: 预期 结局 remediation: 修复建议 created_at: 创建 时刻 updated_at: 更新 时刻
扫描任务表 (scan_tasks)
id: 任务ID (主键) name: 任务名称 policy_id: 策略ID (外键) status: 情形 scheduled_time: 规划 时刻 start_time: 开始 时刻 end_time: 结束 时刻 created_by: 创建者ID (外键) created_at: 创建 时刻 updated_at: 更新 时刻
扫描目标表 (scan_targets)
id: 目标ID (主键) task_id: 任务ID (外键) host_id: 主机ID (外键) group_id: 组ID (外键,可为空) status: 情形 created_at: 创建 时刻
扫描 结局表 (scan_results)
id: 结局ID (主键) task_id: 任务ID (外键) host_id: 主机ID (外键) check_id: 检查项ID (外键) status: 情形 (通过、失败、警告、不适用) actual_value: 实际值 details: 详细信息 scanned_at: 扫描 时刻 created_at: 创建 时刻
漏洞管理相关表
漏洞库表 (vulnerabilities)
id: 漏洞ID (主键) cve_id: CVE编号 title: 漏洞 深入了解 description: 漏洞描述 severity: 严重性 cvss_score: CVSS评分 affected_products: 受影响产品 references: 参考链接 published_date: 公布日期 created_at: 创建 时刻 updated_at: 更新 时刻
主机漏洞表 (host_vulnerabilities)
id: 记录ID (主键) host_id: 主机ID (外键) vulnerability_id: 漏洞ID (外键) status: 情形 (未修复、已修复、已忽略) details: 详细信息 detected_at: 检测 时刻 remediated_at: 修复 时刻 created_at: 创建 时刻 updated_at: 更新 时刻
合规管理相关表
合规标准表 (compliance_standards)
id: 标准ID (主键) code: 标准代码 name: 标准名称 description: 标准描述 version: 版本 created_at: 创建 时刻 updated_at: 更新 时刻
合规控制点表 (compliance_controls)
id: 控制点ID (主键) standard_id: 标准ID (外键) control_id: 控制点编号 name: 控制点名称 description: 控制点描述 category: 类别 created_at: 创建 时刻 updated_at: 更新 时刻
控制点映射表 (control_check_ pping)
id: 映射ID (主键) control_id: 控制点ID (外键) check_id: 检查项ID (外键) created_at: 创建 时刻
事件和告警相关表
安全事件表 (security_events)
id: 事件ID (主键) host_id: 主机ID (外键) event_type: 事件类型 source: 事件来源 severity: 严重性 message: 事件消息 details: 详细信息 event_time: 事件 时刻 created_at: 创建 时刻
告警表 (alerts)
id: 告警ID (主键) event_id: 事件ID (外键,可为空) result_id: 扫描 结局ID (外键,可为空) vulnerability_id: 漏洞ID (外键,可为空) host_id: 主机ID (外键) alert_type: 告警类型 severity: 严重性 status: 情形 (未处理、已确认、已解决、已忽略) message: 告警消息 details: 详细信息 generated_at: 生成 时刻 acknowledged_by: 确认人ID (外键,可为空) acknowledged_at: 确认 时刻 resolved_at: 解决 时刻 created_at: 创建 时刻 updated_at: 更新 时刻
告警 制度表 (alert_rules)
id: 制度ID (主键) name: 制度名称 description: 制度描述 rule_type: 制度类型 conditions: 触发条件 severity: 严重性 actions: 触发动作 is_active: 是否激活 created_by: 创建者ID (外键) created_at: 创建 时刻 updated_at: 更新 时刻
体系配置相关表
体系配置表 (system_settings)
id: 配置ID (主键) category: 类别 key: 键名 value: 值 description: 描述 created_at: 创建 时刻 updated_at: 更新 时刻
调度任务表 (scheduled_tasks)
id: 任务ID (主键) name: 任务名称 task_type: 任务类型 frequency: 频率 cron_expression: Cron表达式 parameters: 参数 is_active: 是否激活 last_run: 上次运行 时刻 next_run: 下次运行 时刻 created_at: 创建 时刻 updated_at: 更新 时刻
14.4 体系环境部署
本节将详细介绍Linux 体系安全审计平台的环境部署 经过,包括 体系环境要求、服务器配置和客户端代理部署。
14.4.1 体系环境说明
服务器端环境要求
硬件需求:
CPU:至少4核,推荐8核以上 内存:至少8GB,推荐16GB以上 存储:根据数据量,至少100GB,建议使用SSD 网络:千兆网络连接
软件需求:
操作 体系:Ubuntu 20.04 LTS / CentOS 8 / RHEL 8 Docker:20.10 或更高版本 Docker Compose:2.0 或更高版本 Python:3.8 或更高版本 PostgreSQL:13 或更高版本 Redis:6.0 或更高版本 Nginx:1.18 或更高版本
网络需求:
开放端口:80/443 (HTTP/HTTPS),5432 (PostgreSQL),6379 (Redis) 允许与客户端代理通信(通常通过HTTPS) 建议使用内网部署,或通过VPN访问
客户端代理环境要求
硬件需求:
最低配置:1核CPU,512MB内存 建议配置:2核CPU,1GB内存
软件需求:
操作 体系:各种Linux发行版,包括但不限于:
RHEL/CentOS 7+ Ubuntu 16.04+ Debian 9+ SUSE Linux Enterprise 12+
Python:3.6 或更高版本 基本 体系工具:bash, curl, openssl, sudo
权限需求:
默认需要root权限或sudo权限进行安全检查 可配置为特权有限的模式(部分功能会受限)
14.4.2 上报主机配置
客户端代理是安全审计平台的关键组件,负责在被监控的Linux服务器上收集数据并执行安全检查。本小节将详细介绍客户端代理的安装和配置 经过。
客户端代理架构
客户端代理采用模块化设计,主要包括 下面内容组件:
核心服务:
主服务进程,负责协调各个模块 与服务器通信的客户端组件 本地数据存储和缓存
数据收集模块:
体系信息收集器 配置文件解析器 服务和进程监控器
安全检查模块:
基线检查执行器 漏洞扫描组件 合规性检测组件
实时监控模块:
文件完整性监控 日志监控和分析 体系活动监控
修复执行模块:
自动修复执行器 修复 结局验证器
客户端代理安装
安装脚本方式
下面内容 一个典型的客户端代理安装脚本:
bash
#!/bin/bash # 设置变量 AGENT_VERSION="1.0.0" SERVER_URL="https://security-audit-server.example.com" API_KEY="your-api-key-here" INSTALL_DIR="/opt/security-audit-agent" LOG_DIR="/var/log/security-audit-agent" CONFIG_DIR="/etc/security-audit-agent" # 检查 体系要求 check_requirements() { echo "检查 体系要求..." # 检查操作 体系 if [ -f /etc/os-release ]; then . /etc/os-release echo "操作 体系: $NAME $VERSION_ID" else echo "无法确定操作 体系版本" exit 1 fi # 检查Python版本 if com nd -v python3 >/dev/null 2>&1; then PYTHON_VERSION=$(python3 --version | awk '{print $2}') echo "Python版本: $PYTHON_VERSION" # 检查Python版本是否满足要求 PYTHON_MAJOR=$(echo $PYTHON_VERSION | cut -d. -f1) PYTHON_MINOR=$(echo $PYTHON_VERSION | cut -d. -f2) if [ "$PYTHON_MAJOR" -lt 3 ] || ([ "$PYTHON_MAJOR" -eq 3 ] && [ "$PYTHON_MINOR" -lt 6 ]); then echo "错误: 需要Python 3.6或更高版本" exit 1 fi else echo "错误: 未找到Python 3" exit 1 fi # 检查必要的 体系工具 for cmd in curl openssl sudo; do if ! com nd -v $cmd >/dev/null 2>&1; then echo "错误: 未找到$cmd命令" exit 1 fi done echo " 体系要求检查通过" } # 创建必要的目录 create_directories() { echo "创建必要的目录..." mkdir -p $INSTALL_DIR mkdir -p $LOG_DIR mkdir -p $CONFIG_DIR # 设置适当的权限 chmod 750 $INSTALL_DIR chmod 750 $LOG_DIR chmod 750 $CONFIG_DIR } # 下载代理程序包 download_agent() { echo "下载代理程序包..." curl -sSL "$SERVER_URL/download/agent-$AGENT_VERSION.tar.gz" -o /tmp/agent.tar.gz if [ $? -ne 0 ]; then echo "错误: 下载代理程序包失败" exit 1 fi # 验证下载的文件 if com nd -v sha256sum >/dev/null 2>&1; then curl -sSL "$SERVER_URL/download/agent-$AGENT_VERSION.sha256" -o /tmp/agent.sha256 echo "验证下载文件的完整性..." cd /tmp && sha256sum -c agent.sha256 if [ $? -ne 0 ]; then echo "错误: 文件验证失败" exit 1 fi fi # 解压文件 echo "解压代理程序包..." tar -xzf /tmp/agent.tar.gz -C $INSTALL_DIR if [ $? -ne 0 ]; then echo "错误: 解压代理程序包失败" exit 1 fi } # 安装Python依赖 install_dependencies() { echo "安装Python依赖..." if com nd -v pip3 >/dev/null 2>&1; then pip3 install -r $INSTALL_DIR/requirements.txt else python3 -m pip install -r $INSTALL_DIR/requirements.txt fi if [ $? -ne 0 ]; then echo "错误: 安装Python依赖失败" exit 1 fi } # 配置代理 configure_agent() { echo "配置代理..." # 创建配置文件 cat > $CONFIG_DIR/agent.conf << EOF [agent] version = $AGENT_VERSION install_dir = $INSTALL_DIR log_dir = $LOG_DIR [server] url = $SERVER_URL api_key = $API_KEY [logging] level = INFO x_size = 10MB backup_count = 5 [security] ssl_verify = true EOF # 生成唯一标识 AGENT_ID=$(hostname -f | sha256sum | cut -d' ' -f1) echo "agent_id = $AGENT_ID" >> $CONFIG_DIR/agent.conf # 收集 体系信息 OS_NAME=$(grep -oP '(?<=^ID=).+' /etc/os-release | tr -d '"') OS_VERSION=$(grep -oP '(?<=^VERSION_ID=).+' /etc/os-release | tr -d '"') KERNEL=$(uname -r) # 创建 体系信息文件 cat > $CONFIG_DIR/system_info.conf << EOF [system] hostname = $(hostname -f) os_name = $OS_NAME os_version = $OS_VERSION kernel = $KERNEL architecture = $(uname -m) cpu_cores = $(grep -c processor /proc/cpuinfo) memory_total = $(grep MemTotal /proc/meminfo | awk '{print $2}') EOF } # 安装 体系服务 install_service() { echo "安装 体系服务..." # 创建systemd服务文件 cat > /etc/systemd/system/security-audit-agent.service << EOF [Unit] Description=Security Audit Agent After=network.target [Service] Type= User=root ExecStart=$INSTALL_DIR/bin/agent.py --config $CONFIG_DIR/agent.conf Restart=on-failure RestartSec=10 WorkingDirectory=$INSTALL_DIR [Install] WantedBy=multi-user.target EOF # 重新加载systemd配置 systemctl daemon-reload # 启用并启动服务 systemctl enable security-audit-agent systemctl start security-audit-agent # 检查服务 情形 SERVICE_STATUS=$(systemctl is-active security-audit-agent) if [ "$SERVICE_STATUS" != "active" ]; then echo "警告: 服务启动失败,请检查日志" echo "日志位置: $LOG_DIR/agent.log" exit 1 fi echo "服务安装并启动成功" } # 注册代理到服务器 register_agent() { echo "注册代理到服务器..." REGISTER_DATA="{"agent_id":"$AGENT_ID","hostname":"$(hostname -f)","ip":"$(hostname -I | awk '{print $1}')","os_type":"$OS_NAME","os_version":"$OS_VERSION","kernel_version":"$KERNEL"}" REGISTER_RESULT=$(curl -sSL -X POST -H "Content-Type: application/json" -H "Authorization: ApiKey $API_KEY" -d "$REGISTER_DATA" "$SERVER_URL/api/v1/agents/register") if echo $REGISTER_RESULT | grep -q "success"; then echo "代理注册成功" else echo "警告: 代理注册失败,请检查服务器连接和API密钥" echo "注册响应: $REGISTER_RESULT" fi } # 清理临时文件 cleanup() { echo "清理临时文件..." rm -f /tmp/agent.tar.gz rm -f /tmp/agent.sha256 } # 主函数 in() { echo "开始安装安全审计代理 v$AGENT_VERSION..." # 检查是否为root用户 if [ "$(id -u)" != "0" ]; then echo "错误: 请使用root用户运行此脚本" exit 1 fi check_requirements create_directories download_agent install_dependencies configure_agent install_service register_agent cleanup echo "安全审计代理安装完成" echo "代理ID: $AGENT_ID" echo "配置目录: $CONFIG_DIR" echo "日志目录: $LOG_DIR" } # 执行主函数 inDocker方式安装
对于支持Docker的环境,也可以使用容器化部署:
bash
#!/bin/bash # 设置变量 SERVER_URL="https://security-audit-server.example.com" API_KEY="your-api-key-here" AGENT_VERSION="1.0.0" CONFIG_DIR="/etc/security-audit-agent" LOG_DIR="/var/log/security-audit-agent" # 检查Docker if ! com nd -v docker >/dev/null 2>&1; then echo "错误: 未找到Docker命令" exit 1 fi # 创建配置目录 mkdir -p $CONFIG_DIR mkdir -p $LOG_DIR # 生成配置文件 cat > $CONFIG_DIR/agent.conf << EOF [agent] version = $AGENT_VERSION log_dir = /logs [server] url = $SERVER_URL api_key = $API_KEY [logging] level = INFO x_size = 10MB backup_count = 5 [security] ssl_verify = true EOF # 生成唯一标识 AGENT_ID=$(hostname -f | sha256sum | cut -d' ' -f1) echo "agent_id = $AGENT_ID" >> $CONFIG_DIR/agent.conf # 拉取Docker镜像 docker pull security-audit/agent:$AGENT_VERSION # 运行容器 docker run -d --name security-audit-agent --restart always --network host --privileged -v $CONFIG_DIR:/etc/agent -v $LOG_DIR:/logs -v /:/host:ro -v /var/run/docker.sock:/var/run/docker.sock -e AGENT_ID=$AGENT_ID -e SERVER_URL=$SERVER_URL -e API_KEY=$API_KEY security-audit/agent:$AGENT_VERSION # 检查容器 情形 if [ $(docker ps -q -f name=security-audit-agent | wc -l) -eq 0 ]; then echo "错误: 容器启动失败" docker logs security-audit-agent exit 1 fi echo "安全审计代理容器启动成功" echo "代理ID: $AGENT_ID"客户端代理配置
安装完成后,可以根据需要进一步配置客户端代理的行为:
基本配置
下面内容是客户端代理的基本配置选项:
ini
[agent] # 基本设置 version = 1.0.0 install_dir = /opt/security-audit-agent log_dir = /var/log/security-audit-agent agent_id = auto-generated-unique-id hostname = auto-detected-hostname # 性能设置 x_cpu_percent = 30 x_memory_mb = 200 io_priority = low [server] # 服务器连接设置 url = https://security-audit-server.example.com api_key = your-api-key-here connection_timeout = 30 retry_interval = 60 x_retries = 5 [security] # 安全设置 ssl_verify = true encrypt_data = true encryption_key = auto-generated-key [logging] # 日志设置 level = INFO x_size = 10MB backup_count = 5 syslog_enabled = false [monitoring] # 监控设置 enabled = true interval = 60 idle_cpu_threshold = 80 collect_metrics = true [scan] # 扫描设置 allow_auto_scan = true x_concurrent_checks = 5 scan_schedule = 0 2 * * * # 每天凌晨2点 [remediation] # 修复设置 allow_auto_remediation = false require_approval = true backup_before_change = true高 质量配置
对于特定需求,还可以进行更 高 质量的配置:
ini
[agent.modules] # 模块启用/禁用设置 baseline_check = true vulnerability_scan = true compliance_check = true file_integrity_monitoring = true log_monitoring = true process_monitoring = true [baseline.custom_checks] # 自定义基线检查 enabled = true check_dir = /etc/security-audit-agent/custom_checks [vulnerability.exclusions] # 漏洞扫描排除项 cve_ids = CVE- 2024-1234,CVE- 2024-5678 packages = test-package,dev-package [file_integrity] # 文件完整性监控设置 enabled = true monitor_interval = 300 include_paths = /etc/,/bin/,/ in/,/usr/bin/,/usr/ in/ exclude_paths = /etc/resolv.conf,/etc/hosts hash_algorithm = sha256 [log_monitoring] # 日志监控设置 enabled = true log_files = /var/log/auth.log,/var/log/syslog,/var/log/secure x_log_size = 100MB [network] # 网络设置 proxy_enabled = false proxy_url = http://proxy.example.com:3128 proxy_user = proxy_user proxy_password = proxy_password [offline_mode] # 离线模式设置 enabled = false data_retention_days = 7 x_storage_size = 1GB客户端代理通信
客户端代理与服务器的通信基于HTTPS协议,使用REST API进行数据交换。主要的通信流程包括:
初始注册:
代理首次启动时向服务器注册 提供主机信息和唯一标识 接收服务器分配的配置和初始任务
定期检查:
代理定期向服务器检查新任务 报告自身 情形和基本 体系指标 接收配置更新或命令
任务执行:
接收扫描或检查任务 执行本地操作并收集 结局 将 结局上传至服务器
事件上报:
检测到安全事件时立即上报 提供详细的事件信息和上下文 可能接收服务器的响应命令
离线操作:
网络中断时缓存数据到本地 连接恢复后自动同步数据 支持基本的离线检查功能
通信采用TLS加密,并使用API密钥或证书进行身份验证,确保数据传输的安全性和完整性。
14.5 服务器端功能设计
本节将详细介绍Linux 体系安全审计平台服务器端的功能设计和实现 技巧,包括Django配置和具体的功能实现 技巧。
14.5.1 Django 配置
Django 一个强大的Python Web框架,我们用它来构建安全审计平台的服务器端应用。 下面内容是主要的Django配置和项目结构。
项目结构
stylus
security_audit_platform/ ├── nage.py ├── security_audit/ │ ├── __init__.py │ ├── asgi.py │ ├── celery.py │ ├── settings/ │ │ ├── __init__.py │ │ ├── base.py │ │ ├── development.py │ │ └── production.py │ ├── urls.py │ └── wsgi.py ├── apps/ │ ├── accounts/ │ ├── agents/ │ ├── alerts/ │ ├── api/ │ ├── assets/ │ ├── baseline/ │ ├── compliance/ │ ├── dashboard/ │ ├── reports/ │ ├── scanning/ │ └── vulnerabilities/ ├── static/ ├── templates/ └── requirements/ ├── base.txt ├── development.txt └── production.txt核心设置
下面内容是Django项目的核心设置(settings/base.py):
python
import os from pathlib import Path from datetime import timedelta # 构建路径 BASE_DIR = Path(__file__).resolve().parent.parent.parent # 安全设置 SECRET_KEY = os.environ.get('DJANGO_SECRET_KEY', 'your-secret-key-here') DEBUG = False ALLOWED_HOSTS = [] # 应用定义 INSTALLED_APPS = [ # Django应用 'django.contrib.admin', 'django.contrib.auth', 'django.contrib.contenttypes', 'django.contrib.sessions', 'django.contrib.messages', 'django.contrib.staticfiles', # 第三方应用 'rest_framework', 'rest_framework.authtoken', 'corsheaders', 'django_celery_beat', 'django_celery_results', 'django_filters', 'drf_yasg', # 项目应用 'apps.accounts', 'apps.agents', 'apps.alerts', 'apps.api', 'apps.assets', 'apps.baseline', 'apps.compliance', 'apps.dashboard', 'apps.reports', 'apps.scanning', 'apps.vulnerabilities', ] MIDDLEWARE = [ 'django.middleware.security.SecurityMiddleware', 'corsheaders.middleware.CorsMiddleware', 'django.contrib.sessions.middleware.SessionMiddleware', 'django.middleware.common.CommonMiddleware', 'django.middleware.csrf.CsrfViewMiddleware', 'django.contrib.auth.middleware.AuthenticationMiddleware', 'django.contrib.messages.middleware.MessageMiddleware', 'django.middleware.clickjacking.XFrameOptionsMiddleware', ] ROOT_URLCONF = 'security_audit.urls' TEMPLATES = [ { 'BACKEND': 'django.template.backends.django.DjangoTemplates', 'DIRS': [BASE_DIR / 'templates'], 'APP_DIRS': True, 'OPTIONS': { 'context_processors': [ 'django.template.context_processors.debug', 'django.template.context_processors.request', 'django.contrib.auth.context_processors.auth', 'django.contrib.messages.context_processors.messages', ], }, }, ] WSGI_APPLICATION = 'security_audit.wsgi.application' # 数据库配置 DATABASES = { 'default': { 'ENGINE': 'django.db.backends.postgresql', 'NAME': os.environ.get('DB_NAME', 'security_audit'), 'USER': os.environ.get('DB_USER', 'postgres'), 'PASSWORD': os.environ.get('DB_PASSWORD', ''), 'HOST': os.environ.get('DB_HOST', 'localhost'), 'PORT': os.environ.get('DB_PORT', '5432'), } } # 缓存配置 CACHES = { 'default': { 'BACKEND': 'django_redis.cache.RedisCache', 'LOCATION': os.environ.get('REDIS_URL', 'redis://localhost:6379/1'), 'OPTIONS': { 'CLIENT_CLASS': 'django_redis.client.DefaultClient', } } } # 用户认证 AUTH_USER_MODEL = 'accounts.User' AUTH_PASSWORD_VALIDATORS = [ {'NAME': 'django.contrib.auth.password_validation.UserAttributeSimilarityValidator'}, {'NAME': 'django.contrib.auth.password_validation.MinimumLengthValidator'}, {'NAME': 'django.contrib.auth.password_validation.CommonPasswordValidator'}, {'NAME': 'django.contrib.auth.password_validation.NumericPasswordValidator'}, ] # 国际化 LANGUAGE_CODE = 'en-us' TIME_ZONE = 'UTC' USE_I18N = True USE_L10N = True USE_TZ = True # 静态文件 STATIC_URL = '/static/' STATIC_ROOT = BASE_DIR / 'staticfiles' STATICFILES_DIRS = [BASE_DIR / 'static'] # 媒体文件 MEDIA_URL = '/media/' MEDIA_ROOT = BASE_DIR / 'media' # REST Framework配置 REST_FRAMEWORK = { 'DEFAULT_AUTHENTICATION_CLASSES': ( 'rest_framework.authentication.TokenAuthentication', 'rest_framework.authentication.SessionAuthentication', ), 'DEFAULT_PERMISSION_CLASSES': ( 'rest_framework.permissions.IsAuthenticated', ), 'DEFAULT_PAGINATION_CLASS': 'rest_framework.pagination.PageNumberPagination', 'PAGE_SIZE': 20, 'DEFAULT_FILTER_BACKENDS': ( 'django_filters.rest_framework.DjangoFilterBackend', 'rest_framework.filters.SearchFilter', 'rest_framework.filters.OrderingFilter', ), 'DEFAULT_RENDERER_CLASSES': ( 'rest_framework.renderers.JSONRenderer', 'rest_framework.renderers.BrowsableAPIRenderer', ), 'DEFAULT_THROTTLE_CLASSES': ( 'rest_framework.throttling.AnonRateThrottle', 'rest_framework.throttling.UserRateThrottle' ), 'DEFAULT_THROTTLE_RATES': { 'anon': '100/day', 'user': '1000/day' } } # Celery配置 CELERY_BROKER_URL = os.environ.get('CELERY_BROKER_URL', 'amqp://localhost') CELERY_RESULT_BACKEND = 'django-db' CELERY_ACCEPT_CONTENT = ['json'] CELERY_TASK_SERIALIZER = 'json' CELERY_RESULT_SERIALIZER = 'json' CELERY_TIMEZONE = TIME_ZONE CELERY_BEAT_SCHEDULER = 'django_celery_beat.schedulers:DatabaseScheduler' # CORS配置 CORS_ALLOW_ALL_ORIGINS = False CORS_ALLOWED_ORIGINS = [ 'http://localhost:8000', 'http://localhost:3000', ] # 日志配置 LOGGING = { 'version': 1, 'disable_existing_loggers': False, 'for tters': { 'verbose': { 'for t': '{levelname} {asctime} {module} {process:d} {thread:d} {message}', 'style': '{', }, ' ': { 'for t': '{levelname} {message}', 'style': '{', }, }, 'handlers': { 'console': { 'level': 'INFO', 'class': 'logging.StreamHandler', 'for tter': 'verbose' }, 'file': { 'level': 'INFO', 'class': 'logging.handlers.RotatingFileHandler', 'filename': BASE_DIR / 'logs/security_audit.log', ' xBytes': 1024*1024*10, # 10 MB 'backupCount': 10, 'for tter': 'verbose', }, }, 'loggers': { 'django': { 'handlers': ['console', 'file'], 'level': 'INFO', 'propagate': True, }, 'security_audit': { 'handlers': ['console', 'file'], 'level': 'INFO', 'propagate': False, }, }, } # Swagger设置 SWAGGER_SETTINGS = { 'SECURITY_DEFINITIONS': { 'Token': { 'type': 'apiKey', 'name': 'Authorization', 'in': 'header' } }, 'USE_SESSION_AUTH': True, 'LOGIN_URL': '/admin/login/', 'LOGOUT_URL': '/admin/logout/', }URL配置
主要的URL配置(urls.py):
python
from django.contrib import admin from django.urls import path, include from django.conf import settings from django.conf.urls.static import static from rest_framework import permissions from drf_yasg.views import get_sche _view from drf_yasg import openapi # API文档配置 sche _view = get_sche _view( openapi.Info( title="Security Audit API", default_version='v1', description="API for Linux Security Audit Platform", terms_of_service="https:// .example.com/terms/", contact=openapi.Contact(e il="contact@example.com"), license=openapi.License(name="BSD License"), ), public=True, permission_classes=(permissions.IsAuthenticated,), ) urlpatterns = [ # Admin path('admin/', admin.site.urls), # API文档 path('swagger/', sche _view.with_ui('swagger', cache_timeout=0), name='sche -swagger-ui'), path('redoc/', sche _view.with_ui('redoc', cache_timeout=0), name='sche -redoc'), # API端点 path('api/v1/', include('apps.api.urls')), # 应用URL path('accounts/', include('apps.accounts.urls')), path('agents/', include('apps.agents.urls')), path('assets/', include('apps.assets.urls')), path('baseline/', include('apps.baseline.urls')), path('compliance/', include('apps.compliance.urls')), path('scanning/', include('apps.scanning.urls')), path('vulnerabilities/', include('apps.vulnerabilities.urls')), path('alerts/', include('apps.alerts.urls')), path('reports/', include('apps.reports.urls')), # 主页和仪表板 path('', include('apps.dashboard.urls')), ] # 在开发环境中提供静态和媒体文件 if settings.DEBUG: urlpatterns += static(settings.STATIC_URL, document_root=settings.STATIC_ROOT) urlpatterns += static(settings.MEDIA_URL, document_root=settings.MEDIA_ROOT)14.5.2 功能实现 技巧
下面将详细介绍 几许核心功能模块的实现 技巧。
资产管理模块
资产管理是安全审计的基础,负责管理所有被监控的Linux服务器信息。
模型定义:
python
# apps/assets/models.py from django.db import models from django.utils.translation import gettext_lazy as _ class Host(models.Model): """主机模型""" STATUS_CHOICES = ( ('online', _('Online')), ('offline', _('Offline')), (' intenance', _('Maintenance')), ) hostname = models.CharField(_('Hostname'), x_length=255) ip_address = models.GenericIPAddressField(_('IP Address')) os_type = models.CharField(_('OS Type'), x_length=100, blank=True) os_version = models.CharField(_('OS Version'), x_length=100, blank=True) kernel_version = models.CharField(_('Kernel Version'), x_length=100, blank=True) architecture = models.CharField(_('Architecture'), x_length=50, blank=True) status = models.CharField(_('Status'), x_length=20, choices=STATUS_CHOICES, default='offline') agent_version = models.CharField(_('Agent Version'), x_length=50, blank=True) last_seen = models.DateTimeField(_('Last Seen'), null=True, blank=True) created_at = models.DateTimeField(_('Created At'), auto_now_add=True) updated_at = models.DateTimeField(_('Updated At'), auto_now=True) def __str__(self): return f"{self.hostname} ({self.ip_address})" class Meta: verbose_name = _('Host') verbose_name_plural = _('Hosts') ordering = ['hostname'] unique_together = [['hostname', 'ip_address']] class HostGroup(models.Model): """主机组模型""" name = models.CharField(_('Name'), x_length=100) description = models.TextField(_('Description'), blank=True) hosts = models.ManyToManyField(Host, related_name='groups', blank=True) created_at = models.DateTimeField(_('Created At'), auto_now_add=True) updated_at = models.DateTimeField(_('Updated At'), auto_now=True) def __str__(self): return self.name class Meta: verbose_name = _('Host Group') verbose_name_plural = _('Host Groups') ordering = ['name'] class Service(models.Model): """应用服务模型""" host = models.ForeignKey(Host, on_delete=models.CASCADE, related_name='services') name = models.CharField(_('Name'), x_length=100) service_type = models.CharField(_('Service Type'), x_length=50) version = models.CharField(_('Version'), x_length=50, blank=True) status = models.CharField(_('Status'), x_length=20) port = models.IntegerField(_('Port'), null=True, blank=True) path = models.CharField(_('Path'), x_length=255, blank=True) created_at = models.DateTimeField(_('Created At'), auto_now_add=True) updated_at = models.DateTimeField(_('Updated At'), auto_now=True) def __str__(self): return f"{self.name} ({self.host.hostname})" class Meta: verbose_name = _('Service') verbose_name_plural = _('Services') ordering = ['host', 'name'] unique_together = [['host', 'name', 'port']]序列化器:
python
# apps/assets/serializers.py from rest_framework import serializers from .models import Host, HostGroup, Service class ServiceSerializer(serializers.ModelSerializer): class Meta: model = Service fields = '__all__' read_only_fields = ['created_at', 'updated_at'] class HostSerializer(serializers.ModelSerializer): services = ServiceSerializer( ny=True, read_only=True) groups = serializers.StringRelatedField( ny=True, read_only=True) class Meta: model = Host fields = '__all__' read_only_fields = ['created_at', 'updated_at'] class HostGroupSerializer(serializers.ModelSerializer): hosts_count = serializers.SerializerMethodField() class Meta: model = HostGroup fields = '__all__' read_only_fields = ['created_at', 'updated_at'] def get_hosts_count(self, obj): return obj.hosts.count()视图:
python
# apps/assets/views.py from rest_framework import viewsets, filters, status from rest_framework.decorators import action from rest_framework.response import Response from django_filters.rest_framework import DjangoFilterBackend from .models import Host, HostGroup, Service from .serializers import HostSerializer, HostGroupSerializer, ServiceSerializer from .filters import HostFilter, ServiceFilter class HostViewSet(viewsets.ModelViewSet): """主机视图集""" queryset = Host.objects.all() serializer_class = HostSerializer filter_backends = [DjangoFilterBackend, filters.SearchFilter, filters.OrderingFilter] filterset_class = HostFilter search_fields = ['hostname', 'ip_address', 'os_type', 'os_version'] ordering_fields = ['hostname', 'ip_address', 'status', 'created_at', 'last_seen'] @action(detail=True, methods=['post']) def update_status(self, request, pk=None): """更新主机 情形""" host = self.get_object() status = request.data.get('status') if status not in [s[0] for s in Host.STATUS_CHOICES]: return Response({'error': 'Invalid status'}, status=status.HTTP_400_BAD_REQUEST) host.status = status host.save() return Response(HostSerializer(host).data) @action(detail=True, methods=['get']) def services(self, request, pk=None): """获取主机服务列表""" host = self.get_object() services = host.services.all() page = self.paginate_queryset(services) if page is not None: serializer = ServiceSerializer(page, ny=True) return self.get_paginated_response(serializer.data) serializer = ServiceSerializer(services, ny=True) return Response(serializer.data) class HostGroupViewSet(viewsets.ModelViewSet): """主机组视图集""" queryset = HostGroup.objects.all() serializer_class = HostGroupSerializer filter_backends = [filters.SearchFilter, filters.OrderingFilter] search_fields = ['name', 'description'] ordering_fields = ['name', 'created_at'] @action(detail=True, methods=['get']) def hosts(self, request, pk=None): """获取组内主机列表""" group = self.get_object() hosts = group.hosts.all() page = self.paginate_queryset(hosts) if page is not None: serializer = HostSerializer(page, ny=True) return self.get_paginated_response(serializer.data) serializer = HostSerializer(hosts, ny=True) return Response(serializer.data) @action(detail=True, methods=['post']) def add_hosts(self, request, pk=None): """添加主机到组""" group = self.get_object() host_ids = request.data.get('host_ids', []) hosts = Host.objects.filter(id__in=host_ids) group.hosts.add(*hosts) return Response({'status': 'hosts added to group'}) @action(detail=True, methods=['post']) def remove_hosts(self, request, pk=None): """从组中移除主机""" group = self.get_object() host_ids = request.data.get('host_ids', []) hosts = Host.objects.filter(id__in=host_ids) group.hosts.remove(*hosts) return Response({'status': 'hosts removed from group'}) class ServiceViewSet(viewsets.ModelViewSet): """服务视图集""" queryset = Service.objects.all() serializer_class = ServiceSerializer filter_backends = [DjangoFilterBackend, filters.SearchFilter, filters.OrderingFilter] filterset_class = ServiceFilter search_fields = ['name', 'service_type', 'version'] ordering_fields = ['name', 'service_type', 'status', 'port']URL配置:
python
# apps/assets/urls.py from django.urls import path, include from rest_framework.routers import DefaultRouter from .views import HostViewSet, HostGroupViewSet, ServiceViewSet router = DefaultRouter() router.register(r'hosts', HostViewSet) router.register(r'groups', HostGroupViewSet) router.register(r'services', ServiceViewSet) app_name = 'assets' urlpatterns = [ path('', include(router.urls)), ]安全基线检查模块
安全基线检查是审计平台的核心功能,用于检查 体系是否符合预定义的安全标准。
模型定义:
python
# apps/baseline/models.py from django.db import models from django.utils.translation import gettext_lazy as _ from django.contrib.auth import get_user_model from apps.assets.models import Host, HostGroup User = get_user_model() class SecurityPolicy(models.Model): """安全策略模型""" POLICY_TYPES = ( ('baseline', _('Baseline')), ('vulnerability', _('Vulnerability')), ('compliance', _('Compliance')), ) name = models.CharField(_('Name'), x_length=100) description = models.TextField(_('Description'), blank=True) policy_type = models.CharField(_('Policy Type'), x_length=20, choices=POLICY_TYPES) is_active = models.BooleanField(_('Is Active'), default=True) created_by = models.ForeignKey(User, on_delete=models.SET_NULL, null=True, related_name='created_policies') created_at = models.DateTimeField(_('Created At'), auto_now_add=True) updated_at = models.DateTimeField(_('Updated At'), auto_now=True) def __str__(self): return self.name class Meta: verbose_name = _('Security Policy') verbose_name_plural = _('Security Policies') ordering = ['name'] class CheckItem(models.Model): """检查项模型""" SEVERITY_CHOICES = ( ('critical', _('Critical')), ('high', _('High')), ('medium', _('Medium')), ('low', _('Low')), ('info', _('Info')), ) CHECK_TYPES = ( ('com nd', _('Com nd')), ('file', _('File')), ('service', _('Service')), ('process', _('Process')), ('package', _('Package')), ('custom', _('Custom')), ) policy = models.ForeignKey(SecurityPolicy, on_delete=models.CASCADE, related_name='check_items') code = models.CharField(_('Code'), x_length=50) name = models.CharField(_('Name'), x_length=200) description = models.TextField(_('Description'), blank=True) category = models.CharField(_('Category'), x_length=100) severity = models.CharField(_('Severity'), x_length=20, choices=SEVERITY_CHOICES) check_type = models.CharField(_('Check Type'), x_length=20, choices=CHECK_TYPES) check_script = models.TextField(_('Check Script')) expected_result = models.TextField(_('Expected Result'), blank=True) remediation = models.TextField(_('Remediation'), blank=True) created_at = models.DateTimeField(_('Created At'), auto_now_add=True) updated_at = models.DateTimeField(_('Updated At'), auto_now=True) def __str__(self): return f"{self.code} - {self.name}" class Meta: verbose_name = _('Check Item') verbose_name_plural = _('Check Items') ordering = ['policy', 'code'] unique_together = [['policy', 'code']] class ScanTask(models.Model): """扫描任务模型""" STATUS_CHOICES = ( ('pending', _('Pending')), ('running', _('Running')), ('completed', _('Completed')), ('failed', _('Failed')), ('canceled', _('Canceled')), ) name = models.CharField(_('Name'), x_length=100) policy = models.ForeignKey(SecurityPolicy, on_delete=models.CASCADE, related_name='scan_tasks') status = models.CharField(_('Status'), x_length=20, choices=STATUS_CHOICES, default='pending') scheduled_time = models.DateTimeField(_('Scheduled Time'), null=True, blank=True) start_time = models.DateTimeField(_('Start Time'), null=True, blank=True) end_time = models.DateTimeField(_('End Time'), null=True, blank=True) created_by = models.ForeignKey(User, on_delete=models.SET_NULL, null=True, related_name='created_tasks') created_at = models.DateTimeField(_('Created At'), auto_now_add=True) updated_at = models.DateTimeField(_('Updated At'), auto_now=True) def __str__(self): return self.name class Meta: verbose_name = _('Scan Task') verbose_name_plural = _('Scan Tasks') ordering = ['-created_at'] class ScanTarget(models.Model): """扫描目标模型""" STATUS_CHOICES = ( ('pending', _('Pending')), ('running', _('Running')), ('completed', _('Completed')), ('failed', _('Failed')), ('skipped', _('Skipped')), ) task = models.ForeignKey(ScanTask, on_delete=models.CASCADE, related_name='targets') host = models.ForeignKey(Host, on_delete=models.CASCADE, related_name='scan_targets', null=True, blank=True) group = models.ForeignKey(HostGroup, on_delete=models.CASCADE, related_name='scan_targets', null=True, blank=True) status = models.CharField(_('Status'), x_length=20, choices=STATUS_CHOICES, default='pending') created_at = models.DateTimeField(_('Created At'), auto_now_add=True) def __str__(self): if self.host: return f"Task {self.task.id} - Host {self.host.hostname}" elif self.group: return f"Task {self.task.id} - Group {self.group.name}" return f"Task {self.task.id} - Unknown target" class Meta: verbose_name = _('Scan Target') verbose_name_plural = _('Scan Targets') ordering = ['task', 'created_at'] class ScanResult(models.Model): """扫描 结局模型""" STATUS_CHOICES = ( ('pass', _('Pass')), ('fail', _('Fail')), ('warning', _('Warning')), ('not_applicable', _('Not Applicable')), ('error', _('Error')), ) task = models.ForeignKey(ScanTask, on_delete=models.CASCADE, related_name='results') host = models.ForeignKey(Host, on_delete=models.CASCADE, related_name='scan_results') check = models.ForeignKey(CheckItem, on_delete=models.CASCADE, related_name='scan_results') status = models.CharField(_('Status'), x_length=20, choices=STATUS_CHOICES) actual_value = models.TextField(_('Actual Value'), blank=True) details = models.TextField(_('Details'), blank=True) scanned_at = models.DateTimeField(_('Scanned At'), auto_now_add=True) created_at = models.DateTimeField(_('Created At'), auto_now_add=True) def __str__(self): return f"Task {self.task.id} - Host {self.host.hostname} - Check {self.check.code}" class Meta: verbose_name = _('Scan Result') verbose_name_plural = _('Scan Results') ordering = ['task', 'host', 'check'] unique_together = [['task', 'host', 'check']]任务调度和执行:
python
# apps/baseline/tasks.py import logging from datetime import datetime from celery import shared_task from django.utils import timezone from django.db import transaction from .models import ScanTask, ScanTarget, ScanResult, CheckItem from apps.agents.client import AgentClient logger = logging.getLogger('security_audit.baseline') @shared_task def run_scan_task(task_id): """执行扫描任务""" try: task = ScanTask.objects.get(id=task_id) # 更新任务 情形 task.status = 'running' task.start_time = timezone.now() task.save() logger.info(f"Starting scan task {task.id} - {task.name}") # 处理扫描目标 targets = task.targets.all() total_targets = targets.count() processed_targets = 0 for target in targets: try: if target.host: # 单个主机目标 process_host_target(task, target, target.host) elif target.group: # 主机组目标 for host in target.group.hosts.all(): process_host_target(task, target, host) target.status = 'completed' target.save() processed_targets += 1 # 更新任务进度 logger.info(f"Processed {processed_targets}/{total_targets} targets for task {task.id}") except Exception as e: logger.error(f"Error processing target {target.id} for task {task.id}: {str(e)}") target.status = 'failed' target.save() # 更新任务 情形 task.status = 'completed' task.end_time = timezone.now() task.save() logger.info(f"Completed scan task {task.id} - {task.name}") # 处理 结局,生成告警等 process_scan_results.delay(task_id) except ScanTask.DoesNotExist: logger.error(f"Scan task {task_id} not found") except Exception as e: logger.error(f"Error running scan task {task_id}: {str(e)}") # 更新任务 情形为失败 try: task = ScanTask.objects.get(id=task_id) task.status = 'failed' task.end_time = timezone.now() task.save() except: pass def process_host_target(task, target, host): """处理单个主机的扫描""" logger.info(f"Processing host {host.hostname} for task {task.id}") # 更新目标 情形 target.status = 'running' target.save() # 获取检查项 check_items = task.policy.check_items.all() # 检查主机是否在线 agent_client = AgentClient(host) if not agent_client.is_online(): logger.warning(f"Host {host.hostname} is offline, skipping") target.status = 'skipped' target.save() return # 执行检查 for check_item in check_items: try: # 执行检查脚本 result = agent_client.execute_check(check_item) # 保存 结局 with transaction.atomic(): scan_result = ScanResult( task=task, host=host, check=check_item, status=result['status'], actual_value=result.get('actual_value', ''), details=result.get('details', '') ) scan_result.save() logger.info(f"Check {check_item.code} on {host.hostname}: {result['status']}") except Exception as e: logger.error(f"Error executing check {check_item.code} on {host.hostname}: {str(e)}") # 记录错误 结局 with transaction.atomic(): scan_result = ScanResult( task=task, host=host, check=check_item, status='error', details=f"Error executing check: {str(e)}" ) scan_result.save() @shared_task def process_scan_results(task_id): """处理扫描 结局,生成告警等""" from apps.alerts.models import Alert try: task = ScanTask.objects.get(id=task_id) results = task.results.filter(status='fail') for result in results: # 创建告警 alert = Alert( result_id=result.id, host=result.host, alert_type='baseline_failure', severity=result.check.severity, status='new', message=f"Baseline check failed: {result.check.name}", details=f"Check: {result.check.code} Details: {result.details} Remediation: {result.check.remediation}" ) alert.save() logger.info(f"Created alert for failed check {result.check.code} on {result.host.hostname}") except Exception as e: logger.error(f"Error processing results for task {task_id}: {str(e)}")视图实现:
python
# apps/baseline/views.py from rest_framework import viewsets, filters, status from rest_framework.decorators import action from rest_framework.response import Response from django_filters.rest_framework import DjangoFilterBackend from django.utils import timezone from .models import SecurityPolicy, CheckItem, ScanTask, ScanTarget, ScanResult from .serializers import ( SecurityPolicySerializer, CheckItemSerializer, ScanTaskSerializer, ScanTargetSerializer, ScanResultSerializer ) from .tasks import run_scan_task class SecurityPolicyViewSet(viewsets.ModelViewSet): """安全策略视图集""" queryset = SecurityPolicy.objects.all() serializer_class = SecurityPolicySerializer filter_backends = [DjangoFilterBackend, filters.SearchFilter, filters.OrderingFilter] filterset_fields = ['policy_type', 'is_active'] search_fields = ['name', 'description'] ordering_fields = ['name', 'created_at'] def perform_create(self, serializer): serializer.save(created_by=self.request.user) @action(detail=True, methods=['get']) def check_items(self, request, pk=None): """获取策略的检查项""" policy = self.get_object() check_items = policy.check_items.all() page = self.paginate_queryset(check_items) if page is not None: serializer = CheckItemSerializer(page, ny=True) return self.get_paginated_response(serializer.data) serializer = CheckItemSerializer(check_items, ny=True) return Response(serializer.data) class CheckItemViewSet(viewsets.ModelViewSet): """检查项视图集""" queryset = CheckItem.objects.all() serializer_class = CheckItemSerializer filter_backends = [DjangoFilterBackend, filters.SearchFilter, filters.OrderingFilter] filterset_fields = ['policy', 'severity', 'check_type', 'category'] search_fields = ['code', 'name', 'description'] ordering_fields = ['code', 'name', 'severity'] class ScanTaskViewSet(viewsets.ModelViewSet): """扫描任务视图集""" queryset = ScanTask.objects.all() serializer_class = ScanTaskSerializer filter_backends = [DjangoFilterBackend, filters.SearchFilter, filters.OrderingFilter] filterset_fields = ['policy', 'status', 'created_by'] search_fields = ['name'] ordering_fields = ['name', 'created_at', 'scheduled_time', 'start_time', 'end_time'] def perform_create(self, serializer): serializer.save(created_by=self.request.user) @action(detail=True, methods=['post']) def run(self, request, pk=None): """执行扫描任务""" task = self.get_object() # 检查任务 情形 if task.status in ['running', 'pending']: return Response({'error': 'Task is already running or pending'}, status=status.HTTP_400_BAD_REQUEST) # 更新任务 情形 task.status = 'pending' task.save() # 启动异步任务 run_scan_task.delay(task.id) return Response({'status': 'Task started'}) @action(detail=True, methods=['post']) def cancel(self, request, pk=None): """取消扫描任务""" task = self.get_object() # 只能取消等待中的任务 if task.status != 'pending': return Response({'error': 'Only pending tasks can be canceled'}, status=status.HTTP_400_BAD_REQUEST) # 更新任务 情形 task.status = 'canceled' task.end_time = timezone.now() task.save() return Response({'status': 'Task canceled'}) @action(detail=True, methods=['get']) def results(self, request, pk=None): """获取任务 结局""" task = self.get_object() results = task.results.all() # 过滤参数 status_filter = request.query_params.get('status') if status_filter: results = results.filter(status=status_filter) host_filter = request.query_params.get('host') if host_filter: results = results.filter(host__id=host_filter) severity_filter = request.query_params.get('severity') if severity_filter: results = results.filter(check__severity=severity_filter) page = self.paginate_queryset(results) if page is not None: serializer = ScanResultSerializer(page, ny=True) return self.get_paginated_response(serializer.data) serializer = ScanResultSerializer(results, ny=True) return Response(serializer.data) @action(detail=True, methods=['get']) def sum ry(self, request, pk=None): """获取任务 简介""" task = self.get_object() # 获取各 情形的 结局计数 status_counts = {} for status_choice in ScanResult.STATUS_CHOICES: status_code = status_choice[0] status_counts[status_code] = task.results.filter(status=status_code).count() # 获取各严重级别的失败计数 severity_counts = {} for severity_choice in CheckItem.SEVERITY_CHOICES: severity_code = severity_choice[0] severity_counts[severity_code] = task.results.filter( status='fail', check__severity=severity_code ).count() # 计算总体得分 total_checks = task.results.count() passed_checks = task.results.filter(status='pass').count() score = 0 if total_checks > 0: score = (passed_checks / total_checks) * 100 return Response({ 'task_id': task.id, 'task_name': task.name, 'status': task.status, 'start_time': task.start_time, 'end_time': task.end_time, 'total_hosts': task.results.values('host').distinct().count(), 'total_checks': total_checks, 'status_counts': status_counts, 'severity_counts': severity_counts, 'score': round(score, 2) })怎样样?经过上面的分析代码,我们实现了Linux 体系安全审计平台的核心功能。这个平台可以帮助管理员全面了解 体系安全状况,及时发现并处理潜在安全风险,确保 体系符合安全基线标准。
随着网络安全威胁的不断演变,这样的安全审计平台将在企业IT基础设施保护中发挥越来越重要的 影响。通过持续更新安全检查 制度和漏洞数据库,平台可以保持对最新安全威胁的防护能力,为企业提供全方位的安全保障。