告别低效排查:用ZoomEye API + Python脚本自动化你的资产发现与监控流程
2026/6/26 0:45:21 网站建设 项目流程

告别低效排查:用ZoomEye API + Python脚本自动化你的资产发现与监控流程

在网络安全和运维领域,资产发现与监控是日常工作中最基础却最耗时的环节之一。传统的手动搜索方式不仅效率低下,还容易遗漏关键信息。想象一下,当你需要监控公司数百个IP段中的新开放端口或服务变更时,手动操作几乎是一场噩梦。这正是自动化工具链的价值所在——将重复性劳动交给机器,让工程师专注于真正需要人类智慧的决策环节。

ZoomEye作为专业的网络空间测绘引擎,其API接口为自动化流程提供了强大支持。结合Python的灵活性和丰富的生态库,我们可以构建一套完整的资产监控系统,实现从发现、分析到告警的全流程自动化。本文将带你从零开始,开发一套可集成到现有运维体系中的自动化工具链。

1. 环境准备与ZoomEye API基础

1.1 获取API访问权限

首先需要注册ZoomEye账号并获取API Key:

  1. 访问ZoomEye官网完成注册
  2. 进入个人中心找到"API"选项卡
  3. 生成专属API Key并妥善保存

注意:免费账号有API调用次数限制,企业级需求建议考虑商业授权

1.2 安装必要的Python库

推荐使用Python 3.8+环境,安装以下核心依赖:

pip install requests pandas schedule python-dotenv

各库的作用:

  • requests:处理HTTP API请求
  • pandas:数据分析与报表生成
  • schedule:定时任务管理
  • python-dotenv:环境变量管理

1.3 API基础请求示例

下面是一个简单的API测试脚本,验证环境配置是否正确:

import requests from dotenv import load_dotenv import os load_dotenv() API_KEY = os.getenv('ZOOMEYE_API_KEY') headers = { "API-KEY": API_KEY } response = requests.get( "https://api.zoomeye.org/resources-info", headers=headers ) print(response.json())

将API Key保存在项目根目录的.env文件中:

ZOOMEYE_API_KEY=your_api_key_here

2. 构建自动化资产发现系统

2.1 设计资产搜索策略

根据不同的监控需求,可以设计多种搜索策略:

搜索类型适用场景示例查询
IP段监控企业内网资产发现cidr:192.168.1.0/24
服务发现特定服务版本监控app:nginx port:80
漏洞扫描CVE相关设备发现app:Apache Tomcat ver:9.0.0

2.2 实现基础搜索功能

以下代码实现了基本的资产搜索功能:

def search_assets(query, page=1): url = "https://api.zoomeye.org/host/search" params = { "query": query, "page": page } try: response = requests.get(url, headers=headers, params=params) response.raise_for_status() return response.json() except requests.exceptions.RequestException as e: print(f"API请求失败: {e}") return None

2.3 结果解析与存储

将API返回的JSON数据转换为结构化DataFrame:

import pandas as pd def parse_results(data): records = [] for item in data.get('matches', []): record = { 'ip': item.get('ip'), 'port': item.get('portinfo', {}).get('port'), 'service': item.get('portinfo', {}).get('service'), 'app': item.get('portinfo', {}).get('app'), 'version': item.get('portinfo', {}).get('version'), 'timestamp': item.get('timestamp') } records.append(record) return pd.DataFrame(records)

3. 实现资产变更监控系统

3.1 设计变更检测算法

核心思路是通过定期扫描对比历史数据,检测以下变更类型:

  • 新增资产:之前不存在的IP:Port组合
  • 服务变更:同一IP端口上的服务类型或版本变化
  • 资产下线:之前存在的服务不再响应

3.2 实现差异检测功能

def detect_changes(current_df, previous_df): # 合并两期数据 merged = pd.merge( current_df, previous_df, on=['ip', 'port'], how='outer', suffixes=('_current', '_previous') ) # 识别变更类型 changes = [] for _, row in merged.iterrows(): if pd.isna(row['service_previous']): changes.append({ 'type': '新增', 'ip': row['ip'], 'port': row['port'], 'detail': f"新增服务: {row['service_current']}" }) elif pd.isna(row['service_current']): changes.append({ 'type': '下线', 'ip': row['ip'], 'port': row['port'], 'detail': f"服务下线: {row['service_previous']}" }) elif row['service_current'] != row['service_previous']: changes.append({ 'type': '变更', 'ip': row['ip'], 'port': row['port'], 'detail': f"{row['service_previous']} → {row['service_current']}" }) return pd.DataFrame(changes)

3.3 定时任务集成

使用schedule库实现定时扫描:

import schedule import time def monitoring_job(): print(f"开始执行监控任务: {time.ctime()}") current_data = search_assets("cidr:192.168.1.0/24") current_df = parse_results(current_data) try: previous_df = pd.read_csv('last_scan.csv') except FileNotFoundError: previous_df = pd.DataFrame() if not previous_df.empty: changes = detect_changes(current_df, previous_df) if not changes.empty: alert_on_changes(changes) current_df.to_csv('last_scan.csv', index=False) # 每天凌晨2点执行 schedule.every().day.at("02:00").do(monitoring_job) while True: schedule.run_pending() time.sleep(60)

4. 告警与报表系统集成

4.1 多通道告警实现

根据企业环境选择适当的告警方式:

  • 邮件告警:适合非紧急通知
  • 即时通讯工具:如企业微信、钉钉机器人
  • SIEM系统集成:通过Webhook对接安全事件管理系统

示例邮件告警实现:

import smtplib from email.mime.text import MIMEText def send_alert_email(changes_df): msg = MIMEText(changes_df.to_html(), 'html') msg['Subject'] = '资产变更告警' msg['From'] = 'monitor@example.com' msg['To'] = 'security-team@example.com' with smtplib.SMTP('smtp.example.com') as server: server.send_message(msg)

4.2 自动化报表生成

使用pandas的样式功能创建专业报表:

def generate_report(changes_df): report = changes_df.style\ .applymap(highlight_critical, subset=['type'])\ .set_properties(**{'text-align': 'left'})\ .set_table_styles([ {'selector': 'th', 'props': [('background-color', '#f5f5f5')]} ]) with open('report.html', 'w') as f: f.write(report.render()) def highlight_critical(val): color = 'red' if val == '新增' else 'orange' if val == '变更' else 'gray' return f'color: {color}'

5. 高级应用与优化技巧

5.1 性能优化策略

当监控大量资产时,需要考虑API调用效率:

  • 并行请求:使用多线程/协程并发处理
  • 结果缓存:减少重复查询
  • 增量检查:只检查可能发生变更的IP段

5.2 与企业系统集成

将监控系统融入现有DevOps工具链:

# Jenkins集成示例 def jenkins_callback(build_url): payload = { "text": f"资产扫描完成,查看报告: {build_url}" } requests.post(JENKINS_WEBHOOK, json=payload) # Prometheus监控指标导出 from prometheus_client import start_http_server, Gauge assets_gauge = Gauge('zoomeye_assets', 'Discovered assets count') def export_metrics(): data = search_assets("cidr:192.168.1.0/24") assets_gauge.set(len(data.get('matches', [])))

5.3 异常处理与日志

健壮的生产级代码需要完善的错误处理:

import logging logging.basicConfig( filename='monitor.log', level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s' ) def safe_search(query): try: return search_assets(query) except Exception as e: logging.error(f"搜索失败: {str(e)}") notify_admin(f"搜索失败: {query}") return None

在实际项目中,这套系统帮助我们将资产发现时间从原来的数小时缩短到几分钟,变更检测的实时性也从天级别提升到小时级别。一个特别有用的技巧是为不同类型的资产设置不同的扫描频率——对核心业务系统提高扫描频率,而对相对静态的基础设施则降低频率以节省API配额。

需要专业的网站建设服务?

联系我们获取免费的网站建设咨询和方案报价,让我们帮助您实现业务目标

立即咨询