拼多多店群自动化上架方案

影刀RPA数据清洗实战:从脏数据到可用数据的转化之路

作者:林焱

采集回来的数据很少能直接用——空值、重复、格式混乱、编码错误、异常值……如果不做清洗就直接分析或入库,得出的结论毫无价值。数据清洗通常占数据分析工作量的60%-80%,而影刀RPA配合Python可以高效自动化这个过程。这篇文章系统讲解数据清洗的各种场景和解决方案。


在这里插入图片描述

一、脏数据的七大类型

类型 示例 影响
缺失值 价格为空、电话为None 统计偏差、计算报错
重复数据 同一商品出现多次 统计虚高
格式混乱 “¥5999” “5,999” “5999.00” 无法计算
编码错误 乱码、“锟斤拷” 文本不可读
异常值 价格=0或价格=999999 分析失真
不一致 “北京”/“北京市”/“BJ” 聚合错误
结构错误 多值在一个字段 “红,蓝,绿” 无法筛选

二、缺失值处理

2.1 识别缺失值

在这里插入图片描述

def 识别缺失值(data):
    """统计数据中的缺失值"""
    missing_report = {}
    
    for key in data[0].keys():
        missing_count = sum(1 for item in data if not item.get(key) or item[key] in ["", None, "null", "NULL", "N/A", "-"])
        missing_pct = missing_count / len(data) * 100
        missing_report[key] = {
            "缺失数": missing_count,
            "缺失率": f"{missing_pct:.1f}%"
        }
    
    return missing_report

# 示例
data = [
    {"name": "iPhone", "price": 5999, "stock": 100},
    {"name": "", "price": None, "stock": 0},
    {"name": "华为", "price": "N/A", "stock": None},
]

report = 识别缺失值(data)
for field, info in report.items():
    print(f"{field}:缺失{info['缺失数']}条({info['缺失率']})")

2.2 缺失值填充策略

def 填充缺失值(data, strategy):
    """根据策略填充缺失值"""
    
    for item in data:
        for key, fill_strategy in strategy.items():
            value = item.get(key)
            
            # 判断是否缺失
            if value is None or value in ["", "null", "NULL", "N/A", "-"]:
                if fill_strategy == "zero":
                    item[key] = 0
                    
                elif fill_strategy == "empty_string":
                    item[key] = ""
                    
                elif fill_strategy == "mean":
                    # 用该列的平均值填充
                    values = [item[key] for item in data if isinstance(item.get(key), (int, float))]
                    item[key] = sum(values) / len(values) if values else 0
                    
                elif fill_strategy == "mode":
                    # 用众数填充
                    from collections import Counter
                    values = [item[key] for item in data if item.get(key) not in [None, "", "N/A"]]
                    if values:
                        item[key] = Counter(values).most_common(1)[0][0]
                    
                elif fill_strategy == "forward_fill":
                    # 用前一个有效值填充
                    pass  # 需要在循环中处理
                    
                elif isinstance(fill_strategy, str) and fill_strategy.startswith("default:"):
                    # 用固定默认值
                    item[key] = fill_strategy.replace("default:", "")
                    
                elif callable(fill_strategy):
                    # 用函数计算
                    item[key] = fill_strategy(item)
    
    return data

# 策略配置
strategy = {
    "price": "mean",                    # 价格用平均值
    "stock": "zero",                     # 库存用0
    "category": "default:未分类",         # 分类用"未分类"
    "name": "empty_string",              # 名称用空字符串
}

cleaned_data = 填充缺失值(data, strategy)

三、重复数据处理

在这里插入图片描述

3.1 完全去重

def 完全去重(data):
    """基于所有字段的完全去重"""
    seen = set()
    unique = []
    
    for item in data:
        # 将字典转为可哈希的元组
        key = tuple(sorted(item.items()))
        if key not in seen:
            seen.add(key)
            unique.append(item)
    
    print(f"去重:{len(data)}{len(unique)}(删除{len(data) - len(unique)}条)")
    return unique

3.2 基于关键字段去重

def 关键字段去重(data, keys, keep="first"):
    """基于指定字段去重"""
    seen = {}
    
    for item in data:
        key = tuple(item.get(k, "") for k in keys)
        
        if key not in seen:
            seen[key] = item
        elif keep == "last":
            seen[key] = item
        elif keep == "max_price":
            # 保留价格更高的
            if item.get("price", 0) > seen[key].get("price", 0):
                seen[key] = item
    
    result = list(seen.values())
    print(f"去重(字段:{keys}):{len(data)}{len(result)}")
    return result

# 使用:基于商品名称+链接去重,保留价格更高的
cleaned = 关键字段去重(data, keys=["name", "url"], keep="max_price")

3.3 模糊去重

在这里插入图片描述

def 模糊去重(data, key_field, threshold=0.8):
    """基于文本相似度的模糊去重"""
    from difflib import SequenceMatcher
    
    def similarity(a, b):
        return SequenceMatcher(None, str(a), str(b)).ratio()
    
    unique = []
    removed = 0
    
    for item in data:
        text = item.get(key_field, "")
        is_duplicate = False
        
        for existing in unique:
            if similarity(text, existing.get(key_field, "")) >= threshold:
                is_duplicate = True
                removed += 1
                break
        
        if not is_duplicate:
            unique.append(item)
    
    print(f"模糊去重:{len(data)}{len(unique)}(删除{removed}条相似数据)")
    return unique

四、格式标准化

4.1 价格格式统一

import re

def 标准化价格(price_str):
    """将各种价格格式统一为浮点数"""
    if not price_str or price_str in ["N/A", "-", "暂无报价"]:
        return 0.0
    
    # 转为字符串处理
    price_str = str(price_str)
    
    # 去掉货币符号
    price_str = re.sub(r'[¥¥$€]', '', price_str)
    
    # 去掉千分位逗号
    price_str = price_str.replace(',', '')
    
    # 去掉空格
    price_str = price_str.strip()
    ![在这里插入图片描述](https://i-blog.csdnimg.cn/direct/f1f27b3a7f2c4c349a3e4f41e0ff239e.png#pic_center)

    
    # 提取数字部分
    match = re.search(r'[\d.]+', price_str)
    if match:
        try:
            return float(match.group())
        except ValueError:
            return 0.0
    
    return 0.0

# 测试
test_prices = ["¥5,999.00", "5999", "$99.9", "价格:3999元", "暂无报价", "¥12,345.67", None]
for p in test_prices:
    print(f"{p}{标准化价格(p)}")

4.2 日期格式统一

from datetime import datetime

def 标准化日期(date_str, output_format="%Y-%m-%d"):
    """将各种日期格式统一"""
    if not date_str or date_str in ["N/A", "-", ""]:
        return ""
    
    date_str = str(date_str).strip()
    
    # 常见日期格式列表
    formats = [
        "%Y-%m-%d",       # 2024-03-15
        "%Y/%m/%d",       # 2024/03/15
        "%Y年%m月%d日",    # 2024年03月15日
        "%d/%m/%Y",       # 15/03/2024
        "%m-%d-%Y",       # 03-15-2024
        "%Y%m%d",         # 20240315
        "%b %d, %Y",      # Mar 15, 2024
        "%d %b %Y",       # 15 Mar 2024
    ]
    
    for fmt in formats:
        try:
            dt = datetime.strptime(date_str, fmt)
            return dt.strftime(output_format)
        except ValueError:
            continue
    
    # 尝试用正则提取
    match = re.search(r'(\d{4})[年/\-.](\d{1,2})[月/\-.](\d{1,2})', date_str)
    if match:
        try:
            dt = datetime(int(match.group(1)), int(match.group(2)), int(match.group(3)))
            return dt.strftime(output_format)
        except:
            pass
    
    return date_str  # 无法识别则原样返回

4.3 手机号标准化

def 标准化手机号(phone_str):
    """标准化手机号"""
    if not phone_str:
        return ""
    
    # 只保留数字
    digits = re.sub(r'\D', '', str(phone_str))
    
    # 去掉国际区号
    if digits.startswith("86") and len(digits) == 13:
        digits = digits[2:]
    
    # 验证是否为有效手机号
    if re.match(r'^1[3-9]\d{9}$', digits):
        return digits
    
    return phone_str  # 无效号码原样返回

4.4 地址标准化

def 标准化地址(address):
    """标准化地址(统一省份名称)"""
    if not address:
        return ""
    
    # 省份名称映射
    province_map = {
        "北京": "北京市", "上海": "上海市", "天津": "天津市", "重庆": "重庆市",
        "广东": "广东省", "浙江": "浙江省", "江苏": "江苏省",
        "BJ": "北京市", "SH": "上海市", "GZ": "广东省",
    }
    ![在这里插入图片描述](https://i-blog.csdnimg.cn/direct/a9d1bf13ab914ba2b5290e69d84b9f7d.png#pic_center)

    
    # 替换简称
    for short, full in province_map.items():
        if address.startswith(short) and not address.startswith(full):
            address = address.replace(short, full, 1)
            break
    
    return address

五、异常值检测

5.1 统计方法检测

TEMU店群如何管理运营?

def 检测异常值(data, field, method="iqr"):
    """检测数据中的异常值"""
    values = [item.get(field, 0) for item in data if isinstance(item.get(field), (int, float))]
    
    if not values:
        return []
    
    outliers = []
    
    if method == "iqr":
        # IQR方法
        values_sorted = sorted(values)
        n = len(values_sorted)
        q1 = values_sorted[n // 4]
        q3 = values_sorted[3 * n // 4]
        iqr = q3 - q1
        lower = q1 - 1.5 * iqr
        upper = q3 + 1.5 * iqr
        
        outliers = [item for item in data if isinstance(item.get(field), (int, float)) and (item[field] < lower or item[field] > upper)]
    
    elif method == "zscore":
        # Z-Score方法
        import statistics
        mean = statistics.mean(values)
        std = statistics.stdev(values) if len(values) > 1 else 0
        
        if std > 0:
            outliers = [item for item in data if isinstance(item.get(field), (int, float)) and abs((item[field] - mean) / std) > 3]
    
    elif method == "range":
        # 范围法
        outliers = [item for item in data if isinstance(item.get(field), (int, float)) and (item[field] <= 0 or item[field] > 1000000)]
    
    return outliers

# 使用:检测价格异常值
price_outliers = 检测异常值(data, "price", method="iqr")
print(f"价格异常值:{len(price_outliers)}条")

5.2 异常值处理

在这里插入图片描述

def 处理异常值(data, field, method="clip"):
    """处理异常值"""
    values = [item.get(field, 0) for item in data if isinstance(item.get(field), (int, float))]
    
    if not values:
        return data
    
    # 计算边界
    values_sorted = sorted(values)
    n = len(values_sorted)
    q1 = values_sorted[n // 4]
    q3 = values_sorted[3 * n // 4]
    iqr = q3 - q1
    lower = q1 - 1.5 * iqr
    upper = q3 + 1.5 * iqr
    
    for item in data:
        value = item.get(field)
        if not isinstance(value, (int, float)):
            continue
        
        if method == "clip":
            # 截断:将异常值截断到边界
            item[field] = max(lower, min(upper, value))
            
        elif method == "remove":
            # 删除:标记为无效
            if value < lower or value > upper:
                item[field] = None
                
        elif method == "replace_median":
            # 替换为中位数
            if value < lower or value > upper:
                median = values_sorted[n // 2]
                item[field] = median
    
    return data

六、完整数据清洗流水线

class DataCleaner:
    """数据清洗流水线"""
    
    def __init__(self, config=None):
        self.config = config or {}
        self.stats = {"input": 0, "output": 0, "removed": 0, "fixed": 0}
    
    def clean(self, data):
        """执行完整清洗流水线"""
        self.stats["input"] = len(data)
        
        # 1. 去除完全空行
        data = self._remove_empty_rows(data)
        
        # 2. 缺失值填充
        data = self._fill_missing(data)
        
        # 3. 格式标准化
        data = self._standardize_formats(data)
        
        # 4. 去重
        data = self._deduplicate(data)
        
        # 5. 异常值处理
        data = self._handle_outliers(data)
        
        # 6. 最终校验
        data = self._final_validation(data)
        
        self.stats["output"] = len(data)
        self.stats["removed"] = self.stats["input"] - self.stats["output"]
        
        self._print_report()
        return data
    
    def _remove_empty_rows(self, data):
        """移除所有字段都为空的行"""
        return [item for item in data if any(v for v in item.values() if v not in [None, "", "N/A"])]
    
    def _fill_missing(self, data):
        """填充缺失值"""
        strategy = self.config.get("fill_strategy", {})
        return 填充缺失值(data, strategy)
    
    def _standardize_formats(self, data):
        """格式标准化"""
        for item in data:
            if "price" in item:
                item["price"] = 标准化价格(item["price"])
            if "date" in item:
                item["date"] = 标准化日期(item["date"])
            if "phone" in item:
                item["phone"] = 标准化手机号(item["phone"])
            if "address" in item:
                item["address"] = 标准化地址(item["address"])
            self.stats["fixed"] += 1
        return data
    
    def _deduplicate(self, data):
        """去重"""
        keys = self.config.get("dedup_keys", ["name"])
        return 关键字段去重(data, keys=keys)
    
    def _handle_outliers(self, data):
        """异常值处理"""
        for field in self.config.get("outlier_fields", []):
            data = 处理异常值(data, field, method="clip")
        return data
    
    def _final_validation(self, data):
        """最终校验"""
        valid = []
        for item in data:
            # 检查必填字段
            required = self.config.get("required_fields", [])
            if all(item.get(f) for f in required):
                valid.append(item)
        return valid
    
    def _print_report(self):
        """打印清洗报告"""
        print(f"""数据清洗报告:
    输入:{self.stats['input']}条
    输出:{self.stats['output']}条
    删除:{self.stats['removed']}条
    修正:{self.stats['fixed']}处
    有效率:{self.stats['output']/self.stats['input']*100:.1f}%""")


# 使用
config = {
    "fill_strategy": {"price": "mean", "category": "default:未分类"},
    "dedup_keys": ["name", "url"],
    "outlier_fields": ["price"],
    "required_fields": ["name", "price"],
}

cleaner = DataCleaner(config)
clean_data = cleaner.clean(raw_data)

在这里插入图片描述

总结

数据清洗的核心步骤与工具:

步骤 目的 关键方法
缺失值处理 补全数据 均值/众数/默认值填充
去重 消除重复 完全去重/关键字段去重/模糊去重
格式标准化 统一格式 正则提取/映射表/解析器
异常值处理 消除噪声 IQR/Z-Score/截断/替换
校验 确保质量 必填检查/范围检查/一致性检查

数据清洗的原则:宁可多删不错留,宁可保守不冒险。不确定的数据先标记,不要擅自修改或删除,保留原始数据供回溯。


在这里插入图片描述

作者:林焱 | 觉得有用就收藏,后续分享更多影刀RPA实战技巧
在这里插入图片描述

Logo

电商企业物流数字化转型必备!快递鸟 API 接口,72 小时快速完成物流系统集成。全流程实战1V1指导,营造开放的API技术生态圈。

更多推荐