Google Earth Engine云项目配置全指南:从Cloud Project创建到权限打通
2026/6/13 13:42:51
#!/usr/bin/env python3 """ CDN 测速工具 v2.1 - 测试多个服务地址的连接速度 支持: HTTP延迟、TCP Ping、Traceroute、下载速度测试、JSON导出 """ import urllib.request import urllib.error import time import ssl import socket import statistics import subprocess import struct import sys import json import os from datetime import datetime from urllib.parse import urlparse from concurrent.futures import ThreadPoolExecutor, as_completed # 测试地址列表 ENDPOINTS = [ { "name": "大陆优化CDN", "url": "https://c.cspok.cn", "desc": "针对中国大陆地区优化的后端服务" }, { "name": "主站直连", "url": "https://anyrouter.top", "desc": "主站后端直连服务" }, { "name": "大陆网络优化-宁波", "url": "https://pmpjfbhq.cn-nb1.rainapp.top", "desc": "针对中国大陆地区优化的后端服务" }, { "name": "大陆网络优化-上海", "url": "https://a-ocnfniawgw.cn-shanghai.fcapp.run", "desc": "针对中国大陆地区优化的后端服务" } ] # 测试参数 TEST_COUNT = 3 TCP_PING_COUNT = 5 TIMEOUT = 10 TRACEROUTE_MAX_HOPS = 20 def create_ssl_context(): """创建SSL上下文""" ctx = ssl.create_default_context() ctx.check_hostname = False ctx.verify_mode = ssl.CERT_NONE return ctx def parse_url(url): """解析URL获取主机名和端口""" parsed = urlparse(url) host = parsed.hostname port = parsed.port or (443 if parsed.scheme == 'https' else 80) return host, port def resolve_host(hostname): """DNS解析""" try: ip = socket.gethostbyname(hostname) return {"success": True, "ip": ip} except socket.gaierror as e: return {"success": False, "error": str(e)} def tcp_ping(host, port, timeout=5): """TCP Ping - 测试TCP连接延迟""" try: sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM) sock.settimeout(timeout) start = time.time() sock.connect((host, port)) elapsed = (time.time() - start) * 1000 # ms sock.close() return {"success": True, "latency": elapsed} except socket.timeout: return {"success": False, "error": "连接超时"} except socket.error as e: return {"success": False, "error": str(e)} def tcp_ping_test(host, port, count=TCP_PING_COUNT): """执行多次TCP Ping测试""" results = [] for i in range(count): result = tcp_ping(host, port) results.append(result) time.sleep(0.1) # 短暂间隔 return results def traceroute_tcp(host, port=443, max_hops=TRACEROUTE_MAX_HOPS, timeout=2): """ TCP Traceroute 实现 使用递增的TTL值来追踪路由路径 """ results = [] # 先解析目标IP try: dest_ip = socket.gethostbyname(host) except socket.gaierror: return [{"hop": 1, "error": "DNS解析失败"}] print(f" 追踪路由到 {host} ({dest_ip}), 最大 {max_hops} 跳:") print(f" {'跳数':<4} {'IP地址':<18} {'延迟':<12} {'主机名'}") print(" " + "-" * 55) for ttl in range(1, max_hops + 1): # 创建原始socket(需要root权限)或使用UDP hop_result = {"hop": ttl, "ip": None, "latency": None, "hostname": None} try: # 使用UDP进行traceroute(不需要root) recv_socket = socket.socket(socket.AF_INET, socket.SOCK_RAW, socket.IPPROTO_ICMP) recv_socket.settimeout(timeout) send_socket = socket.socket(socket.AF_INET, socket.SOCK_DGRAM, socket.IPPROTO_UDP) send_socket.setsockopt(socket.SOL_IP, socket.IP_TTL, ttl) start = time.time() send_socket.sendto(b'', (dest_ip, 33434 + ttl)) try: data, addr = recv_socket.recvfrom(1024) elapsed = (time.time() - start) * 1000 hop_ip = addr[0] # 尝试反向DNS解析 try: hostname = socket.gethostbyaddr(hop_ip)[0] except: hostname = hop_ip hop_result["ip"] = hop_ip hop_result["latency"] = elapsed hop_result["hostname"] = hostname print(f" {ttl:<4} {hop_ip:<18} {elapsed:.2f} ms {hostname}") # 如果到达目标 if hop_ip == dest_ip: results.append(hop_result) break except socket.timeout: hop_result["error"] = "超时" print(f" {ttl:<4} {'*':<18} {'*':<12} 请求超时") recv_socket.close() send_socket.close() except PermissionError: # 没有root权限,使用系统traceroute命令 return traceroute_system(host, max_hops) except Exception as e: hop_result["error"] = str(e) # 尝试使用系统命令 return traceroute_system(host, max_hops) results.append(hop_result) return results def traceroute_system(host, max_hops=TRACEROUTE_MAX_HOPS): """使用系统traceroute命令""" results = [] # 检测可用的traceroute命令 traceroute_cmd = None for cmd in ['traceroute', 'tracepath', 'mtr']: try: subprocess.run([cmd, '--version'], capture_output=True, timeout=2) traceroute_cmd = cmd break except: continue if not traceroute_cmd: print(" 未找到traceroute工具,尝试安装...") return [{"hop": 0, "error": "未找到traceroute工具"}] try: if traceroute_cmd == 'traceroute': cmd = ['traceroute', '-n', '-m', str(max_hops), '-w', '2', host] elif traceroute_cmd == 'tracepath': cmd = ['tracepath', '-n', '-m', str(max_hops), host] else: # mtr cmd = ['mtr', '-n', '-c', '1', '--report', host] print(f" 执行: {' '.join(cmd)}") print() process = subprocess.Popen( cmd, stdout=subprocess.PIPE, stderr=subprocess.PIPE, text=True ) # 实时输出 for line in process.stdout: print(f" {line.rstrip()}") # 解析输出 parts = line.split() if parts and parts[0].isdigit(): hop_num = int(parts[0]) results.append({"hop": hop_num, "raw": line.strip()}) process.wait(timeout=60) except subprocess.TimeoutExpired: print(" traceroute 超时") except Exception as e: print(f" traceroute 错误: {e}") return results def traceroute_simple(host, max_hops=TRACEROUTE_MAX_HOPS): """ 简化版TCP traceroute,使用connect超时方式 """ results = [] try: dest_ip = socket.gethostbyname(host) except socket.gaierror: return [{"hop": 1, "error": "DNS解析失败"}] print(f" 追踪到 {host} ({dest_ip}):") print(f" {'跳数':<4} {'状态':<15} {'延迟'}") print(" " + "-" * 40) # 尝试直接连接测试 for ttl in range(1, max_hops + 1): try: sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM) sock.setsockopt(socket.IPPROTO_IP, socket.IP_TTL, ttl) sock.settimeout(2) start = time.time() try: sock.connect((dest_ip, 443)) elapsed = (time.time() - start) * 1000 print(f" {ttl:<4} {'到达目标':<15} {elapsed:.2f} ms") results.append({"hop": ttl, "status": "到达", "latency": elapsed}) sock.close() break except socket.timeout: print(f" {ttl:<4} {'超时':<15} *") results.append({"hop": ttl, "status": "超时"}) except socket.error as e: elapsed = (time.time() - start) * 1000 if e.errno == 111: # Connection refused - 通常表示到达 print(f" {ttl:<4} {'中间节点':<15} {elapsed:.2f} ms") results.append({"hop": ttl, "status": "中间节点", "latency": elapsed}) else: print(f" {ttl:<4} {'TTL过期':<15} {elapsed:.2f} ms") results.append({"hop": ttl, "status": "TTL过期", "latency": elapsed}) sock.close() except Exception as e: print(f" {ttl:<4} 错误: {e}") results.append({"hop": ttl, "error": str(e)}) return results def test_latency(url, timeout=TIMEOUT): """测试HTTP延迟""" ctx = create_ssl_context() start = time.time() try: req = urllib.request.Request(url, method='HEAD') req.add_header('User-Agent', 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36') with urllib.request.urlopen(req, timeout=timeout, context=ctx) as response: status = response.status elapsed = (time.time() - start) * 1000 return {"success": True, "latency": elapsed, "status": status} except urllib.error.HTTPError as e: elapsed = (time.time() - start) * 1000 return {"success": True, "latency": elapsed, "status": e.code} except Exception as e: return {"success": False, "error": str(e)} def test_download_speed(url, timeout=TIMEOUT): """测试下载速度""" ctx = create_ssl_context() start = time.time() try: req = urllib.request.Request(url) req.add_header('User-Agent', 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36') with urllib.request.urlopen(req, timeout=timeout, context=ctx) as response: data = response.read() size = len(data) elapsed = time.time() - start if elapsed > 0: speed = size / elapsed return {"success": True, "size": size, "time": elapsed, "speed": speed} return {"success": True, "size": size, "time": 0, "speed": 0} except Exception as e: return {"success": False, "error": str(e)} def format_speed(bps): """格式化速度""" if bps >= 1024 * 1024: return f"{bps / (1024 * 1024):.2f} MB/s" elif bps >= 1024: return f"{bps / 1024:.2f} KB/s" else: return f"{bps:.2f} B/s" def format_size(bytes_size): """格式化大小""" if bytes_size >= 1024 * 1024: return f"{bytes_size / (1024 * 1024):.2f} MB" elif bytes_size >= 1024: return f"{bytes_size / 1024:.2f} KB" else: return f"{bytes_size} B" def test_endpoint(endpoint, enable_traceroute=True): """测试单个端点""" name = endpoint["name"] url = endpoint["url"] desc = endpoint["desc"] host, port = parse_url(url) print(f"\n{'='*65}") print(f"📡 {name}") print(f" URL: {url}") print(f" 描述: {desc}") print(f"{'='*65}") # 结果数据结构 result_data = { "name": name, "url": url, "description": desc, "host": host, "port": port, "test_time": datetime.now().isoformat(), "dns": {}, "tcp_ping": {}, "traceroute": [], "http": {}, "download": {}, "summary": {} } # DNS 解析 print(f"\n🔍 DNS解析:") dns_result = resolve_host(host) if dns_result["success"]: ip = dns_result["ip"] print(f" {host} -> {ip}") result_data["dns"] = {"success": True, "ip": ip, "hostname": host} else: print(f" ❌ DNS解析失败: {dns_result['error']}") result_data["dns"] = {"success": False, "error": dns_result['error']} result_data["summary"] = { "avg_latency": None, "tcp_latency": None, "speed": None, "status": "DNS解析失败" } return result_data # TCP Ping 测试 print(f"\n🏓 TCP Ping 测试 (端口 {port}, 共{TCP_PING_COUNT}次):") tcp_results = tcp_ping_test(ip, port, TCP_PING_COUNT) tcp_latencies = [] tcp_details = [] for i, result in enumerate(tcp_results, 1): if result["success"]: lat = result["latency"] tcp_latencies.append(lat) tcp_details.append({"seq": i, "latency_ms": round(lat, 2), "success": True}) print(f" 第{i}次: {lat:.2f} ms") else: tcp_details.append({"seq": i, "success": False, "error": result['error']}) print(f" 第{i}次: 失败 - {result['error']}") result_data["tcp_ping"]["details"] = tcp_details if tcp_latencies: tcp_avg = statistics.mean(tcp_latencies) tcp_min = min(tcp_latencies) tcp_max = max(tcp_latencies) tcp_jitter = statistics.stdev(tcp_latencies) if len(tcp_latencies) > 1 else 0 tcp_loss = (TCP_PING_COUNT - len(tcp_latencies)) / TCP_PING_COUNT * 100 print(f"\n 📊 TCP Ping 统计:") print(f" 平均: {tcp_avg:.2f} ms") print(f" 最小: {tcp_min:.2f} ms") print(f" 最大: {tcp_max:.2f} ms") print(f" 抖动: {tcp_jitter:.2f} ms") print(f" 丢包: {tcp_loss:.1f}%") result_data["tcp_ping"]["statistics"] = { "avg_ms": round(tcp_avg, 2), "min_ms": round(tcp_min, 2), "max_ms": round(tcp_max, 2), "jitter_ms": round(tcp_jitter, 2), "packet_loss_percent": round(tcp_loss, 1), "success_count": len(tcp_latencies), "total_count": TCP_PING_COUNT } else: tcp_avg = None print(f"\n ❌ TCP Ping 全部失败") result_data["tcp_ping"]["statistics"] = {"success": False, "packet_loss_percent": 100} # Traceroute 测试 if enable_traceroute: print(f"\n🛤️ Traceroute 测试:") trace_results = traceroute_simple_with_data(ip, max_hops=15) result_data["traceroute"] = trace_results # HTTP 延迟测试 print(f"\n⏱️ HTTP 延迟测试 (共{TEST_COUNT}次):") latencies = [] http_details = [] http_status = None for i in range(TEST_COUNT): result = test_latency(url) if result["success"]: latency = result["latency"] latencies.append(latency) http_status = result.get("status", "N/A") http_details.append({"seq": i+1, "latency_ms": round(latency, 2), "status": http_status, "success": True}) print(f" 第{i+1}次: {latency:.2f} ms (HTTP {http_status})") else: http_details.append({"seq": i+1, "success": False, "error": result['error']}) print(f" 第{i+1}次: 失败 - {result['error']}") result_data["http"]["details"] = http_details result_data["http"]["status_code"] = http_status if latencies: avg_latency = statistics.mean(latencies) min_latency = min(latencies) max_latency = max(latencies) print(f"\n 📊 HTTP延迟统计:") print(f" 平均: {avg_latency:.2f} ms") print(f" 最小: {min_latency:.2f} ms") print(f" 最大: {max_latency:.2f} ms") result_data["http"]["statistics"] = { "avg_ms": round(avg_latency, 2), "min_ms": round(min_latency, 2), "max_ms": round(max_latency, 2), "success_count": len(latencies), "total_count": TEST_COUNT } else: avg_latency = None print(f"\n ❌ HTTP延迟测试失败") result_data["http"]["statistics"] = {"success": False} # 下载测试 print(f"\n📥 下载测试:") dl_result = test_download_speed(url) if dl_result["success"]: size = dl_result["size"] dl_time = dl_result["time"] speed = dl_result["speed"] print(f" 下载大小: {format_size(size)}") print(f" 下载用时: {dl_time:.3f} 秒") print(f" 下载速度: {format_speed(speed)}") result_data["download"] = { "success": True, "size_bytes": size, "time_seconds": round(dl_time, 3), "speed_bps": round(speed, 2), "speed_formatted": format_speed(speed) } else: speed = None print(f" ❌ 下载失败 - {dl_result['error']}") result_data["download"] = {"success": False, "error": dl_result['error']} # 汇总 result_data["summary"] = { "avg_latency": round(avg_latency, 2) if avg_latency else None, "tcp_latency": round(tcp_avg, 2) if tcp_avg else None, "speed": round(speed, 2) if speed else None, "status": "可用" if tcp_avg else "不可用" } return result_data def traceroute_simple_with_data(host, max_hops=TRACEROUTE_MAX_HOPS): """简化版TCP traceroute,返回数据""" results = [] try: dest_ip = socket.gethostbyname(host) except socket.gaierror: return [{"hop": 1, "error": "DNS解析失败"}] print(f" 追踪到 {host} ({dest_ip}):") print(f" {'跳数':<4} {'状态':<15} {'延迟'}") print(" " + "-" * 40) for ttl in range(1, max_hops + 1): hop_data = {"hop": ttl} try: sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM) sock.setsockopt(socket.IPPROTO_IP, socket.IP_TTL, ttl) sock.settimeout(2) start = time.time() try: sock.connect((dest_ip, 443)) elapsed = (time.time() - start) * 1000 print(f" {ttl:<4} {'到达目标':<15} {elapsed:.2f} ms") hop_data.update({"status": "到达目标", "latency_ms": round(elapsed, 2), "reached": True}) results.append(hop_data) sock.close() break except socket.timeout: print(f" {ttl:<4} {'超时':<15} *") hop_data.update({"status": "超时", "timeout": True}) except socket.error as e: elapsed = (time.time() - start) * 1000 if e.errno == 111: print(f" {ttl:<4} {'中间节点':<15} {elapsed:.2f} ms") hop_data.update({"status": "中间节点", "latency_ms": round(elapsed, 2)}) else: print(f" {ttl:<4} {'TTL过期':<15} {elapsed:.2f} ms") hop_data.update({"status": "TTL过期", "latency_ms": round(elapsed, 2)}) sock.close() except Exception as e: print(f" {ttl:<4} 错误: {e}") hop_data.update({"error": str(e)}) results.append(hop_data) return results def print_summary(results): """打印汇总结果""" print("\n\n" + "="*70) print("📈 测速结果汇总") print("="*70) # 按TCP延迟排序 valid_results = [r for r in results if r["summary"].get("tcp_latency") is not None] valid_results.sort(key=lambda x: x["summary"]["tcp_latency"]) print(f"\n{'排名':<4} {'名称':<22} {'TCP延迟':<12} {'HTTP延迟':<12} {'速度':<12}") print("-" * 70) for i, r in enumerate(valid_results, 1): tcp_str = f"{r['summary']['tcp_latency']:.2f} ms" if r['summary']['tcp_latency'] else "N/A" http_str = f"{r['summary']['avg_latency']:.2f} ms" if r['summary']['avg_latency'] else "N/A" speed_str = format_speed(r['summary']['speed']) if r['summary']['speed'] else "N/A" print(f"{i:<4} {r['name']:<22} {tcp_str:<12} {http_str:<12} {speed_str:<12}") # 显示失败的 failed = [r for r in results if r["summary"].get("tcp_latency") is None] if failed: print("\n❌ 无法连接的节点:") for r in failed: print(f" - {r['name']} ({r['url']})") # 推荐 if valid_results: best = valid_results[0] print(f"\n" + "="*70) print(f"✅ 推荐使用: {best['name']}") print(f" URL: {best['url']}") if best['summary']['tcp_latency']: print(f" TCP延迟: {best['summary']['tcp_latency']:.2f} ms") if best['summary']['avg_latency']: print(f" HTTP延迟: {best['summary']['avg_latency']:.2f} ms") def export_json(results, output_file=None): """导出测试结果到JSON文件""" if output_file is None: timestamp = datetime.now().strftime("%Y%m%d_%H%M%S") output_file = f"speedtest_result_{timestamp}.json" # 构建完整报告 report = { "report_info": { "tool": "CDN 测速工具", "version": "2.1", "generated_at": datetime.now().isoformat(), "test_parameters": { "tcp_ping_count": TCP_PING_COUNT, "http_test_count": TEST_COUNT, "timeout_seconds": TIMEOUT, "traceroute_max_hops": TRACEROUTE_MAX_HOPS } }, "endpoints": results, "ranking": [], "recommendation": None } # 生成排名 valid_results = [r for r in results if r["summary"].get("tcp_latency") is not None] valid_results.sort(key=lambda x: x["summary"]["tcp_latency"]) for i, r in enumerate(valid_results, 1): report["ranking"].append({ "rank": i, "name": r["name"], "url": r["url"], "tcp_latency_ms": r["summary"]["tcp_latency"], "http_latency_ms": r["summary"]["avg_latency"], "speed_bps": r["summary"]["speed"] }) # 推荐 if valid_results: best = valid_results[0] report["recommendation"] = { "name": best["name"], "url": best["url"], "tcp_latency_ms": best["summary"]["tcp_latency"], "http_latency_ms": best["summary"]["avg_latency"], "reason": "TCP延迟最低" } # 写入文件 with open(output_file, 'w', encoding='utf-8') as f: json.dump(report, f, ensure_ascii=False, indent=2) return output_file def main(): import argparse parser = argparse.ArgumentParser(description='CDN 测速工具 v2.1') parser.add_argument('--no-traceroute', '-t', action='store_true', help='禁用 traceroute 测试') parser.add_argument('--quick', '-q', action='store_true', help='快速模式(减少测试次数)') parser.add_argument('--json', '-j', nargs='?', const='auto', default=None, metavar='FILE', help='导出结果到JSON文件(不指定文件名则自动生成)') parser.add_argument('--output', '-o', type=str, default=None, help='指定JSON输出文件路径') args = parser.parse_args() global TEST_COUNT, TCP_PING_COUNT if args.quick: TEST_COUNT = 1 TCP_PING_COUNT = 3 print("\n" + "="*70) print(" 🚀 CDN 测速工具 v2.1") print(" 支持: TCP Ping | Traceroute | HTTP延迟 | 下载速度 | JSON导出") print(" 测试中国大陆优化节点") print("="*70) results = [] enable_traceroute = not args.no_traceroute for endpoint in ENDPOINTS: result = test_endpoint(endpoint, enable_traceroute=enable_traceroute) results.append(result) print_summary(results) # 导出JSON json_file = args.output or args.json if json_file: if json_file == 'auto': json_file = None # 让export_json自动生成文件名 output_path = export_json(results, json_file) print(f"\n📄 测试结果已导出到: {output_path}") print("\n" + "="*70) print("测速完成!") print("="*70 + "\n") if __name__ == "__main__": main()