舆情监测实战:如何合规采集小红书数据构建品牌口碑分析体系

甲鱼舆情监测软件 上海舆情监测

舆情监测实战:如何合规采集小红书数据构建品牌口碑分析体系

在当今数字化营销时代,小红书已成为品牌口碑传播的核心阵地。每天有数百万用户在平台上分享消费体验、产品测评和生活方式内容,这些数据对于企业舆情监测和市场研究具有极高的价值。本文将详细介绍如何合规采集小红书数据,构建完善的品牌口碑分析体系。

一、小红书数据在舆情监测中的价值

小红书(Xiaohongshu)作为国内领先的生活方式分享平台,其用户群体以一二线城市的年轻女性为主,消费能力强,内容活跃度高。平台上的笔记内容涵盖美妆护肤、穿搭时尚、母婴育儿、旅行出游、餐饮美食等多个领域,已成为消费者决策的重要参考来源。

对于企业而言,小红书数据监测具有以下核心价值:

1. 口碑预警:通过监测品牌相关笔记,企业可以第一时间发现负面舆情,及时响应处理,避免危机扩散。

2. 竞品分析:追踪竞品在小红书上的声量、内容类型、用户反馈,为制定营销策略提供数据支撑。

3. KOL筛选:通过数据分析识别高性价比的腰部博主和素人用户,构建高效的种草矩阵。

4. 趋势洞察:分析热门话题和内容趋势,把握市场风向,指导产品研发和营销策划。

二、小红书数据采集的技术原理

小红书采用了多种反爬虫技术来保护平台数据安全,包括参数签名验证、请求频率限制、UA检测、IP限制等。合规的数据采集需要在遵守平台规则的前提下进行,以下介绍几种常见的技术方案。

2.1 基于官方API的数据获取

小红书开放了部分API接口,开发者可以通过申请接口权限获取数据。这种方式最为合规,数据质量也最高。

import requests
import json
import time
import hashlib
import random

class XiaohongshuAPI:
    """小红书官方API调用封装"""
    def __init__(self, app_id, app_secret):
        self.app_id = app_id
        self.app_secret = app_secret
        self.access_token = None
        self.token_expires_at = 0

    def get_access_token(self):
        if self.access_token and time.time() < self.token_expires_at:
            return self.access_token
        url = "https://open.xiaohongshu.com/oauth/access_token"
        params = {
            "app_id": self.app_id,
            "app_secret": self.app_secret,
            "grant_type": "client_credential"
        }
        response = requests.post(url, data=params, timeout=10)
        result = response.json()
        if result.get("code") == 0:
            self.access_token = result["data"]["access_token"]
            self.token_expires_at = time.time() + result["data"]["expires_in"] - 300
            return self.access_token
        else:
            raise Exception(f"获取Token失败: {result}")

    def search_notes(self, keyword, page=1, page_size=20):
        url = "https://open.xiaohongshu.com/api/notes/search"
        headers = {
            "Authorization": f"Bearer {self.get_access_token()}",
            "Content-Type": "application/json"
        }
        payload = {"keyword": keyword, "page": page, "page_size": page_size, "sort": "general"}
        response = requests.post(url, headers=headers, json=payload, timeout=10)
        return response.json()

    def get_note_detail(self, note_id):
        url = f"https://open.xiaohongshu.com/api/notes/{note_id}"
        headers = {"Authorization": f"Bearer {self.get_access_token()}"}
        return requests.get(url, headers=headers, timeout=10).json()

    def get_user_notes(self, user_id, page=1):
        url = f"https://open.xiaohongshu.com/api/users/{user_id}/notes"
        headers = {"Authorization": f"Bearer {self.get_access_token()}"}
        params = {"page": page, "page_size": 20}
        return requests.get(url, headers=headers, params=params, timeout=10).json()

if __name__ == "__main__":
    api = XiaohongshuAPI(app_id="YOUR_APP_ID", app_secret="YOUR_APP_SECRET")
    search_result = api.search_notes(keyword="新能源汽车", page=1)
    print(f"搜索到 {search_result['data']['total']} 篇笔记")
    for note in search_result['data']['notes'][:5]:
        print(f"标题: {note['title']}, 点赞: {note['liked_count']}")

2.2 基于Selenium的模拟浏览器采集

对于需要模拟登录或复杂交互的场景,可以使用Selenium模拟真实浏览器操作。这种方式可以绕过部分反爬虫检测,但需要控制请求频率。

from selenium import webdriver
from selenium.webdriver.common.by import By
from selenium.webdriver.support.ui import WebDriverWait
from selenium.webdriver.support import expected_conditions as EC
from selenium.webdriver.chrome.options import Options
from bs4 import BeautifulSoup
import time
import random

class XiaohongshuCrawler:
    """基于Selenium的小红书数据采集器"""
    def __init__(self, headless=True):
        chrome_options = Options()
        if headless:
            chrome_options.add_argument("--headless")
        chrome_options.add_argument("--no-sandbox")
        chrome_options.add_argument("--disable-dev-shm-usage")
        chrome_options.add_argument("--disable-gpu")
        chrome_options.add_argument("--window-size=1920,1080")
        user_agent = "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 Chrome/120.0.0.0 Safari/537.36"
        chrome_options.add_argument(f"user-agent={user_agent}")
        chrome_options.add_experimental_option("excludeSwitches", ["enable-automation"])
        self.driver = webdriver.Chrome(options=chrome_options)
        self.driver.execute_cdp_cmd("Page.addScriptToEvaluateOnNewDocument", {
            "source": "Object.defineProperty(navigator, 'webdriver', {get: () => undefined});"
        })

    def wait_random(self, min_sec=1, max_sec=3):
        time.sleep(random.uniform(min_sec, max_sec))

    def scroll_page(self, scroll_count=3):
        for _ in range(scroll_count):
            self.driver.execute_script("window.scrollTo(0, document.body.scrollHeight);")
            self.wait_random(1, 2)
            self.driver.execute_script("window.scrollBy(0, -200);")
            self.wait_random(0.5, 1)

    def search(self, keyword, max_notes=50):
        search_url = f"https://www.xiaohongshu.com/search_result?keyword={keyword}&type=51"
        self.driver.get(search_url)
        self.wait_random(2, 4)
        notes = []
        last_height = self.driver.execute_script("return document.body.scrollHeight")
        while len(notes) < max_notes:
            self.scroll_page(scroll_count=2)
            soup = BeautifulSoup(self.driver.page_source, "html.parser")
            cards = soup.select(".note-item")
            for card in cards:
                try:
                    title = card.select_one(".title").text.strip()
                    author = card.select_one(".user-name").text.strip()
                    likes = card.select_one(".like-count").text.strip()
                    notes.append({"title": title, "author": author, "likes": likes})
                except:
                    continue
            new_height = self.driver.execute_script("return document.body.scrollHeight")
            if new_height == last_height:
                break
            last_height = new_height
        return notes[:max_notes]

    def close(self):
        self.driver.quit()

if __name__ == "__main__":
    crawler = XiaohongshuCrawler(headless=True)
    try:
        results = crawler.search("面膜推荐", max_notes=30)
        print(f"采集到 {len(results)} 篇笔记")
        for r in results:
            print(f"- {r['title']} | 作者: {r['author']} | 点赞: {r['likes']}")
    finally:
        crawler.close()

三、合规数据采集的最佳实践

3.1 遵守robots.txt协议

在采集前,首先检查小红书的robots.txt文件,了解平台的采集规则和限制。

import urllib.robotparser

def check_robots_txt(base_url):
    rp = urllib.robotparser.RobotFileParser()
    rp.set_url(f"{base_url}/robots.txt")
    try:
        rp.read()
        print(f"允许抓取: {rp.can_fetch('*', '/')}")
        print(f"允许Googlebot: {rp.can_fetch('Googlebot', '/')}")
        for line in rp.raw("/sitemap.xml"):
            if "Sitemap:" in line:
                print(line.strip())
    except Exception as e:
        print(f"读取robots.txt失败: {e}")

check_robots_txt("https://www.xiaohongshu.com")

3.2 控制请求频率

过高频率的请求会对目标服务器造成压力,也可能触发反爬虫机制。建议在每次请求间设置合理的延迟。

import time
import threading
from collections import defaultdict

class RateLimiter:
    def __init__(self, rate=5, per=60):
        self.rate = rate
        self.per = per
        self.allowance = defaultdict(int)
        self.last_check = defaultdict(time.time)
        self.lock = threading.Lock()

    def acquire(self, key="default"):
        with self.lock:
            current = time.time()
            time_passed = current - self.last_check[key]
            self.last_check[key] = current
            self.allowance[key] += time_passed * (self.rate / self.per)
            if self.allowance[key] > self.rate:
                self.allowance[key] = self.rate
            if self.allowance[key] < 1.0:
                sleep_time = (1.0 - self.allowance[key]) * (self.per / self.rate)
                time.sleep(sleep_time)
                self.allowance[key] = 0
            else:
                self.allowance[key] -= 1.0
            return True

limiter = RateLimiter(rate=5, per=60)
for i in range(10):
    limiter.acquire()
    print(f"第{i+1}次请求完成")

3.3 数据存储与安全

import sqlite3
import hashlib
from datetime import datetime

class DataStorage:
    def __init__(self, db_path="xiaohongshu_data.db"):
        self.db_path = db_path
        self.init_db()

    def init_db(self):
        conn = sqlite3.connect(self.db_path)
        cursor = conn.cursor()
        cursor.execute("""
            CREATE TABLE IF NOT EXISTS notes (
                id INTEGER PRIMARY KEY AUTOINCREMENT,
                note_id TEXT UNIQUE, title TEXT, content TEXT,
                author TEXT, author_id TEXT,
                liked_count INTEGER, collected_count INTEGER,
                commented_count INTEGER, topics TEXT,
                published_at TEXT, crawled_at TEXT, keyword TEXT, md5 TEXT
            )
        """)
        cursor.execute("""
            CREATE TABLE IF NOT EXISTS authors (
                id INTEGER PRIMARY KEY AUTOINCREMENT,
                author_id TEXT UNIQUE, nickname TEXT,
                followers INTEGER, verified BOOLEAN, crawled_at TEXT
            )
        """)
        cursor.execute("CREATE INDEX IF NOT EXISTS idx_note_id ON notes(note_id)")
        cursor.execute("CREATE INDEX IF NOT EXISTS idx_author ON notes(author_id)")
        cursor.execute("CREATE INDEX IF NOT EXISTS idx_keyword ON notes(keyword)")
        conn.commit()
        conn.close()

    def save_note(self, note_data):
        conn = sqlite3.connect(self.db_path)
        cursor = conn.cursor()
        md5_hash = hashlib.md5(
            (note_data.get("note_id", "") + note_data.get("title", "")).encode()
        ).hexdigest()
        try:
            cursor.execute("""
                INSERT OR REPLACE INTO notes (
                    note_id, title, content, author, author_id,
                    liked_count, collected_count, commented_count,
                    topics, published_at, crawled_at, keyword, md5
                ) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)
            """, (
                note_data.get("note_id"), note_data.get("title"),
                note_data.get("content"), note_data.get("author"),
                note_data.get("author_id"), note_data.get("liked_count", 0),
                note_data.get("collected_count", 0), note_data.get("commented_count", 0),
                ",".join(note_data.get("topics", [])), note_data.get("published_at"),
                datetime.now().isoformat(), note_data.get("keyword"), md5_hash
            ))
            conn.commit()
        finally:
            conn.close()

    def get_stats(self):
        conn = sqlite3.connect(self.db_path)
        cursor = conn.cursor()
        cursor.execute("SELECT COUNT(*) FROM notes")
        total_notes = cursor.fetchone()[0]
        cursor.execute("SELECT COUNT(DISTINCT author_id) FROM notes")
        total_authors = cursor.fetchone()[0]
        conn.close()
        return {"total_notes": total_notes, "total_authors": total_authors}

storage = DataStorage("jiayus_yuqing.db")
stats = storage.get_stats()
print(f"总笔记数: {stats['total_notes']}")

四、舆情监测系统的构建

采集到原始数据后,需要通过数据清洗、分析和可视化,构建完整的舆情监测系统。

import pandas as pd
from collections import Counter
import re

class SentimentAnalyzer:
    def __init__(self):
        self.positive_words = set([
            "优秀", "完美", "惊艳", "推荐", "值得", "好用", "喜欢",
            "超赞", "非常", "满意", "良心", "靠谱", "棒", "赞"
        ])
        self.negative_words = set([
            "失望", "差", "坑", "骗", "烂", "难用", "后悔", "假货",
            "过敏", "投诉", "维权", "虚假", "欺骗", "垃圾", "差评"
        ])

    def clean_text(self, text):
        if not text:
            return ""
        text = re.sub(r"[^\u4e00-\u9fa5a-zA-Z0-9]", " ", text)
        return text.strip()

    def analyze_sentiment(self, text):
        text = self.clean_text(text)
        positive_count = sum(1 for word in self.positive_words if word in text)
        negative_count = sum(1 for word in self.negative_words if word in text)
        if positive_count > negative_count:
            return "positive"
        elif negative_count > positive_count:
            return "negative"
        else:
            return "neutral"

    def generate_report(self, notes_data):
        sentiments = [self.analyze_sentiment(n.get("content", "")) for n in notes_data]
        sentiment_dist = Counter(sentiments)
        return {
            "total": len(notes_data),
            "positive": sentiment_dist.get("positive", 0),
            "negative": sentiment_dist.get("negative", 0),
            "neutral": sentiment_dist.get("neutral", 0)
        }

analyzer = SentimentAnalyzer()
print(analyzer.analyze_sentiment("这款产品非常好用,非常推荐!"))
print(analyzer.analyze_sentiment("太差了,完全是假货,骗人的!"))

五、结语

小红书作为重要的社交流量入口,其数据价值不容忽视。企业在开展舆情监测工作时,应当在合规的前提下,合理利用数据技术,挖掘数据背后的用户声音,为品牌决策提供科学依据。

值得注意的是,本文介绍的技术方案仅供学习和研究使用。任何商业化的数据采集行为,都应当事先取得平台授权,遵守《网络安全法》《数据安全法》《个人信息保护法》等相关法律法规,确保数据采集和使用的合法合规。

甲鱼舆情监测平台致力于为企业提供专业、合规的舆情监测解决方案,支持小红书、微博、微信、抖音等多平台数据监测,助力企业把握舆情动态,提升品牌管理能力。

相关阅读:

  • 《舆情监测与API接口数据对接实战指南》
  • 《微信视频号内容采集技术方案详解》
  • 《网站反爬虫机制与数据安全保护策略》