From c32600730ddd9e7321d603056819d9fbcf2b98db Mon Sep 17 00:00:00 2001 From: madayouxi Date: Tue, 7 Oct 2025 23:18:45 +0800 Subject: [PATCH] =?UTF-8?q?=E6=B7=BB=E5=8A=A0=20main.py?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- main.py | 531 ++++++++++++++++++++++++++++++++++++++++++++++++++++++++ 1 file changed, 531 insertions(+) create mode 100644 main.py diff --git a/main.py b/main.py new file mode 100644 index 0000000..c5ac780 --- /dev/null +++ b/main.py @@ -0,0 +1,531 @@ +#!/usr/bin/env python3 +# coding: utf-8 + +from flask import Flask, request, jsonify, send_from_directory +from flask_cors import CORS +import pandas as pd +import json +import re +import os +from datetime import datetime +from pathlib import Path +from functools import wraps + +app = Flask(__name__, static_folder='static', static_url_path='') +CORS(app) + +# 配置 +BASE_DIR = Path(__file__).resolve().parent +SUSPECTS_FOLDER = BASE_DIR / "suspects" +UPLOAD_FOLDER = BASE_DIR / "uploads" +SUSPECTS_FOLDER.mkdir(exist_ok=True) +UPLOAD_FOLDER.mkdir(exist_ok=True) + +# 最大文件大小限制 (16MB) +app.config['MAX_CONTENT_LENGTH'] = 16 * 1024 * 1024 + +# ----------------- 错误处理装饰器 ----------------- +def handle_errors(f): + """统一错误处理装饰器""" + @wraps(f) + def decorated_function(*args, **kwargs): + try: + return f(*args, **kwargs) + except FileNotFoundError: + return jsonify({'error': '文件不存在'}), 404 + except ValueError as e: + return jsonify({'error': f'数据格式错误: {str(e)}'}), 400 + except Exception as e: + app.logger.error(f'Error in {f.__name__}: {str(e)}') + return jsonify({'error': f'服务器错误: {str(e)}'}), 500 + return decorated_function + +# ----------------- Helpers ----------------- + +def safe_filename(name: str) -> str: + """安全化文件名:去除路径,确保 .json 后缀""" + if not name: + return '' + # 只保留基础文件名 + name = os.path.basename(name) + # 移除危险字符 + name = re.sub(r'[^\w\-_.]', '_', name) + if not name.endswith('.json'): + name = name + '.json' + return name + +def read_json_file(path: Path): + """读取 JSON 文件,带错误处理""" + if not path.exists(): + return [] + try: + with path.open('r', encoding='utf-8') as f: + data = json.load(f) + return data if isinstance(data, list) else [] + except json.JSONDecodeError: + app.logger.warning(f'Invalid JSON in {path}') + return [] + except Exception as e: + app.logger.error(f'Error reading {path}: {str(e)}') + return [] + +def write_json_file(path: Path, data): + """写入 JSON 文件,带备份""" + # 如果文件存在,先备份 + if path.exists(): + backup_path = path.with_suffix('.json.bak') + try: + import shutil + shutil.copy2(path, backup_path) + except Exception as e: + app.logger.warning(f'Backup failed: {str(e)}') + + with path.open('w', encoding='utf-8') as f: + json.dump(data, f, ensure_ascii=False, indent=2) + +def validate_record(record: dict) -> tuple[bool, str]: + """验证记录数据的完整性""" + required_fields = ['name', 'phone'] + for field in required_fields: + if not record.get(field): + return False, f'缺少必填字段: {field}' + + # 验证手机号格式(简单验证) + phone = record.get('phone', '') + if phone and not re.match(r'^[\d\-\*\s]+$', phone): + return False, '手机号格式不正确' + + return True, '' + +def process_order_ids(order_id_str): + """处理订单号字符串,返回完整订单号列表和后4位列表""" + if not order_id_str: + return [], [] + + full_order_ids = [] + suffixes = [] + + for oid in str(order_id_str).split(','): + oid_clean = oid.strip() + if not oid_clean: + continue + + full_order_ids.append(oid_clean) + if len(oid_clean) >= 4: + suffixes.append(oid_clean[-4:]) + + return full_order_ids, suffixes + +# ---------- Excel -> orders ---------- +def process_excel_to_json(file_path): + """ + 转换上传的 Excel 为订单字典列表 + 映射中文表头到内部字段 + """ + try: + df = pd.read_excel(file_path) + except Exception as e: + raise ValueError(f'Excel 文件读取失败: {str(e)}') + + if df.empty: + raise ValueError('Excel 文件为空') + + rename_map = { + "收件人": "name", + "收件电话": "phone", + "收件省": "省", + "收件市": "市", + "收件区": "区", + "店铺": "store", + "平台订单号": "order_id", + "商品名称": "product", + "商品名": "product", + "规格名称": "sku", + "实收金额": "price" + } + df = df.rename(columns={c: rename_map[c] for c in df.columns if c in rename_map}) + + # 构建地址 + if {"省", "市", "区"}.intersection(df.columns): + df["address"] = df[["省", "市", "区"]].fillna("").agg(" ".join, axis=1).str.strip() + else: + if "address" not in df.columns: + df["address"] = "" + + # 删除缺少必要字段的行 + df = df.dropna(subset=[c for c in ["name", "phone"] if c in df.columns], how='any') + + # 过滤补差价商品 + if "product" in df.columns: + df = df[~df["product"].astype(str).str.contains("补收差价|补差价专用", na=False)] + + # 选择相关列 + cols = ["name", "phone", "address", "store", "order_id", "product", "sku", "price"] + existing_cols = [c for c in cols if c in df.columns] + df = df[existing_cols] + + return df.to_dict(orient="records") + +# ---------- 匹配辅助函数 ---------- +def get_suffix(order_id): + """获取订单号后4位""" + if isinstance(order_id, str) and len(order_id) >= 4: + return order_id[-4:] + return None + +def clean_masked(text): + """清理带星号的文本""" + if not isinstance(text, str): + return "" + return text.replace("*", "").strip() + +def get_city(address): + """从地址中提取城市名""" + if not isinstance(address, str): + return "" + match = re.search(r"([\u4e00-\u9fa5]+市)", address) + return match.group(1) if match else "" + +def name_match(a, b): + """姓名匹配(首字匹配)""" + a_clean, b_clean = clean_masked(a), clean_masked(b) + return bool(a_clean and b_clean and a_clean[0] == b_clean[0]) + +def phone_last_digit(phone): + """获取手机号最后一位数字""" + if not isinstance(phone, str): + return "" + digits = re.findall(r"\d", phone) + return digits[-1] if digits else "" + +def phone_match(a, b): + """手机号匹配(尾号匹配)""" + last_a = phone_last_digit(a) + last_b = phone_last_digit(b) + return bool(last_a and last_b and last_a == last_b) + +def calc_match_score(order, suspect): + """ + 计算匹配分数 + 返回: (score, percentage, same_address_bool, same_city_bool) + """ + score = 0 + same_address = False + same_city = False + max_score = 13 + + addr_a = (order.get("address") or "").strip() + addr_b = (suspect.get("address") or "").strip() + + # 地址匹配检查 + if addr_a and addr_b and addr_a == addr_b: + score += 6 + same_address = True + else: + if get_city(addr_a) and get_city(addr_a) == get_city(addr_b): + score += 3 + same_city = True + + # 处理嫌疑人订单号 - 现在使用 suffixes 字段 + suspect_suffixes = suspect.get("suffixes", []) + order_suffix = get_suffix(order.get("order_id", "")) + + if same_address: + if order_suffix and order_suffix in suspect_suffixes: + score += 4 + if name_match(order.get("name"), suspect.get("name")): + score += 2 + if phone_match(order.get("phone"), suspect.get("phone")): + score += 1 + elif same_city: + if name_match(order.get("name"), suspect.get("name")): + score += 1 + if phone_match(order.get("phone"), suspect.get("phone")): + score += 1 + if order_suffix and order_suffix in suspect_suffixes: + score += 2 + + percentage = int((score / max_score) * 100) if score > 0 else 0 + return score, percentage, same_address, same_city + +def match_orders_with_suspects(orders, suspects): + """ + 匹配订单与嫌疑人 + 返回两种分组结构:完整地址匹配和城市级匹配 + """ + full_grouped = {} + city_grouped = {} + + for order in orders: + for suspect in suspects: + score, percentage, addr_match, city_match = calc_match_score(order, suspect) + if score <= 0: + continue + + order_info = { + "orderId": order.get("order_id", ""), + "orderName": order.get("name", ""), + "orderPhone": order.get("phone", ""), + "score": score, + "percentage": percentage + } + + if addr_match: + addr = order.get("address", "") + name = suspect.get("name", "") or "未知" + addr_group = full_grouped.setdefault(addr, {}) + cust_group = addr_group.setdefault(name, { + "name": suspect.get("name", ""), + "phone": suspect.get("phone", ""), + "address": suspect.get("address", ""), + "store": suspect.get("store", ""), + "registertime": suspect.get("registertime", ""), + "order_id": suspect.get("full_order_ids", []), # 返回完整订单号 + "note": suspect.get("note", ""), + "info_url": suspect.get("info_url", ""), + "matched_orders": [] + }) + cust_group["matched_orders"].append(order_info) + + elif city_match: + city = get_city(order.get("address", "")) + if not city: + continue + name = suspect.get("name", "") or "未知" + city_group = city_grouped.setdefault(city, {}) + cust_group = city_group.setdefault(name, { + "name": suspect.get("name", ""), + "phone": suspect.get("phone", ""), + "address": suspect.get("address", ""), + "store": suspect.get("store", ""), + "registertime": suspect.get("registertime", ""), + "order_id": suspect.get("full_order_ids", []), # 返回完整订单号 + "note": suspect.get("note", ""), + "info_url": suspect.get("info_url", ""), + "matched_orders": [] + }) + cust_group["matched_orders"].append(order_info) + + full_matches = [{"address": addr, "customers": list(customers.values())} for addr, customers in full_grouped.items()] + city_matches = [{"city": city, "customers": list(customers.values())} for city, customers in city_grouped.items()] + return full_matches, city_matches + +# ----------------- Routes ----------------- + +@app.route('/') +def index(): + return send_from_directory('static', 'index.html') + +@app.route('/') +def static_files(path): + return send_from_directory('static', path) + +@app.route('/api/suspects', methods=['GET']) +@handle_errors +def list_suspects(): + """列出所有嫌疑人库文件""" + files = sorted([f for f in os.listdir(SUSPECTS_FOLDER) if f.endswith('.json')]) + return jsonify({'files': files}) + +@app.route('/api/suspects/', methods=['GET', 'POST', 'PUT', 'DELETE']) +@handle_errors +def handle_suspect_file(filename): + """处理嫌疑人库文件的 CRUD 操作""" + safe_name = safe_filename(filename) + file_path = SUSPECTS_FOLDER / safe_name + + if request.method == 'GET': + data = read_json_file(file_path) + return jsonify({'data': data}) + + if request.method == 'POST': + if file_path.exists(): + return jsonify({'error': '文件已存在'}), 400 + payload = request.get_json(silent=True) or {} + initial_data = payload.get('data', []) if isinstance(payload, dict) else [] + write_json_file(file_path, initial_data) + return jsonify({'message': '创建成功', 'filename': safe_name}) + + if request.method == 'PUT': + payload = request.get_json(silent=True) + if payload is None: + return jsonify({'error': '无有效 JSON 数据'}), 400 + data_to_write = payload.get('data', []) if isinstance(payload, dict) else payload + write_json_file(file_path, data_to_write) + return jsonify({'message': '更新成功'}) + + if request.method == 'DELETE': + if not file_path.exists(): + return jsonify({'error': '文件不存在'}), 404 + file_path.unlink() + return jsonify({'message': '删除成功'}) + + return jsonify({'error': '不支持的方法'}), 405 + +@app.route('/api/suspects//records', methods=['POST']) +@handle_errors +def add_suspect_record(filename): + """添加嫌疑人记录""" + safe_name = safe_filename(filename) + file_path = SUSPECTS_FOLDER / safe_name + + if not file_path.exists(): + return jsonify({'error': '文件不存在'}), 404 + + payload = request.get_json(silent=True) + if not payload: + return jsonify({'error': '缺少请求体'}), 400 + + # 验证数据 + is_valid, error_msg = validate_record(payload) + if not is_valid: + return jsonify({'error': error_msg}), 400 + + data = read_json_file(file_path) + new_record = payload + + # 添加注册时间 + if 'registertime' not in new_record or not new_record.get('registertime'): + new_record['registertime'] = datetime.now().strftime('%Y-%m-%d %H:%M:%S') + + # 处理订单号:存储完整订单号和后4位 + if 'order_id' in new_record and new_record.get('order_id'): + full_order_ids, suffixes = process_order_ids(new_record['order_id']) + new_record['full_order_ids'] = full_order_ids + new_record['suffixes'] = suffixes + # 保持向后兼容:order_id 字段存储完整订单号(逗号分隔) + new_record['order_id'] = ','.join(full_order_ids) + else: + new_record['full_order_ids'] = [] + new_record['suffixes'] = [] + new_record['order_id'] = '' + + data.append(new_record) + write_json_file(file_path, data) + return jsonify({'message': '添加成功', 'data': new_record}) + +@app.route('/api/suspects//records/', methods=['PUT']) +@handle_errors +def update_suspect_record(filename, index): + """更新嫌疑人记录""" + safe_name = safe_filename(filename) + file_path = SUSPECTS_FOLDER / safe_name + + if not file_path.exists(): + return jsonify({'error': '文件不存在'}), 404 + + payload = request.get_json(silent=True) + if payload is None: + return jsonify({'error': '无有效 JSON 数据'}), 400 + + # 验证数据 + is_valid, error_msg = validate_record(payload) + if not is_valid: + return jsonify({'error': error_msg}), 400 + + data = read_json_file(file_path) + if index < 0 or index >= len(data): + return jsonify({'error': '索引超出范围'}), 400 + + updated_record = payload + # 保留原注册时间 + updated_record['registertime'] = data[index].get('registertime', datetime.now().strftime('%Y-%m-%d %H:%M:%S')) + + # 处理订单号:存储完整订单号和后4位 + if 'order_id' in updated_record and updated_record.get('order_id'): + full_order_ids, suffixes = process_order_ids(updated_record['order_id']) + updated_record['full_order_ids'] = full_order_ids + updated_record['suffixes'] = suffixes + # 保持向后兼容:order_id 字段存储完整订单号(逗号分隔) + updated_record['order_id'] = ','.join(full_order_ids) + else: + updated_record['full_order_ids'] = [] + updated_record['suffixes'] = [] + updated_record['order_id'] = '' + + data[index] = updated_record + write_json_file(file_path, data) + return jsonify({'message': '更新成功'}) + +@app.route('/api/suspects//records/', methods=['DELETE']) +@handle_errors +def delete_suspect_record(filename, index): + """删除嫌疑人记录""" + safe_name = safe_filename(filename) + file_path = SUSPECTS_FOLDER / safe_name + + if not file_path.exists(): + return jsonify({'error': '文件不存在'}), 404 + + data = read_json_file(file_path) + if index < 0 or index >= len(data): + return jsonify({'error': '索引超出范围'}), 400 + + data.pop(index) + write_json_file(file_path, data) + return jsonify({'message': '删除成功'}) + +@app.route('/api/analyze', methods=['POST']) +@handle_errors +def analyze_orders(): + """分析订单并匹配嫌疑人""" + if 'file' not in request.files: + return jsonify({'error': '未上传文件'}), 400 + if 'suspectFile' not in request.form: + return jsonify({'error': '未选择恶意客户库'}), 400 + + uploaded_file = request.files['file'] + suspect_file = request.form['suspectFile'] + + if not uploaded_file.filename: + return jsonify({'error': '文件名为空'}), 400 + + if not uploaded_file.filename.endswith('.xlsx'): + return jsonify({'error': '仅支持 .xlsx 格式'}), 400 + + # 保存上传文件 + save_path = UPLOAD_FOLDER / uploaded_file.filename + uploaded_file.save(str(save_path)) + + try: + # 处理订单 + orders = process_excel_to_json(str(save_path)) + + # 读取嫌疑人数据 + safe_name = safe_filename(suspect_file) + suspect_path = SUSPECTS_FOLDER / safe_name + + if not suspect_path.exists(): + return jsonify({'error': '恶意客户库文件不存在'}), 404 + + suspects = read_json_file(suspect_path) + + # 处理现有数据兼容性:如果记录没有 full_order_ids 和 suffixes,则生成它们 + for suspect in suspects: + if 'full_order_ids' not in suspect or 'suffixes' not in suspect: + full_order_ids, suffixes = process_order_ids(suspect.get('order_id', '')) + suspect['full_order_ids'] = full_order_ids + suspect['suffixes'] = suffixes + + # 执行匹配 + full_grouped, city_grouped = match_orders_with_suspects(orders, suspects) + + return jsonify({ + 'totalOrders': len(orders), + 'fullMatches': full_grouped, + 'cityMatches': city_grouped + }) + + finally: + # 清理临时文件 + try: + if save_path.exists(): + save_path.unlink() + except Exception as e: + app.logger.warning(f'Failed to delete temp file: {str(e)}') + +# ----------------- 运行 ----------------- +if __name__ == '__main__': + # 生产环境请使用 Gunicorn 或 uWSGI + debug_mode = os.getenv('FLASK_DEBUG', 'False').lower() == 'true' + app.run(host='0.0.0.0', port=5000, debug=debug_mode) \ No newline at end of file