舆情监测实战:如何合规采集小红书数据构建品牌口碑分析体系
在当今数字化营销时代,小红书已成为品牌口碑传播的核心阵地。每天有数百万用户在平台上分享消费体验、产品测评和生活方式内容,这些数据对于企业舆情监测和市场研究具有极高的价值。本文将详细介绍如何合规采集小红书数据,构建完善的品牌口碑分析体系。
一、小红书数据在舆情监测中的价值
小红书(Xiaohongshu)作为国内领先的生活方式分享平台,其用户群体以一二线城市的年轻女性为主,消费能力强,内容活跃度高。平台上的笔记内容涵盖美妆护肤、穿搭时尚、母婴育儿、旅行出游、餐饮美食等多个领域,已成为消费者决策的重要参考来源。
对于企业而言,小红书数据监测具有以下核心价值:
1. 口碑预警:通过监测品牌相关笔记,企业可以第一时间发现负面舆情,及时响应处理,避免危机扩散。
2. 竞品分析:追踪竞品在小红书上的声量、内容类型、用户反馈,为制定营销策略提供数据支撑。
3. KOL筛选:通过数据分析识别高性价比的腰部博主和素人用户,构建高效的种草矩阵。
4. 趋势洞察:分析热门话题和内容趋势,把握市场风向,指导产品研发和营销策划。
二、小红书数据采集的技术原理
小红书采用了多种反爬虫技术来保护平台数据安全,包括参数签名验证、请求频率限制、UA检测、IP限制等。合规的数据采集需要在遵守平台规则的前提下进行,以下介绍几种常见的技术方案。
2.1 基于官方API的数据获取
小红书开放了部分API接口,开发者可以通过申请接口权限获取数据。这种方式最为合规,数据质量也最高。
import requests
import json
import time
import hashlib
import random
class XiaohongshuAPI:
"""小红书官方API调用封装"""
def __init__(self, app_id, app_secret):
self.app_id = app_id
self.app_secret = app_secret
self.access_token = None
self.token_expires_at = 0
def get_access_token(self):
if self.access_token and time.time() < self.token_expires_at:
return self.access_token
url = "https://open.xiaohongshu.com/oauth/access_token"
params = {
"app_id": self.app_id,
"app_secret": self.app_secret,
"grant_type": "client_credential"
}
response = requests.post(url, data=params, timeout=10)
result = response.json()
if result.get("code") == 0:
self.access_token = result["data"]["access_token"]
self.token_expires_at = time.time() + result["data"]["expires_in"] - 300
return self.access_token
else:
raise Exception(f"获取Token失败: {result}")
def search_notes(self, keyword, page=1, page_size=20):
url = "https://open.xiaohongshu.com/api/notes/search"
headers = {
"Authorization": f"Bearer {self.get_access_token()}",
"Content-Type": "application/json"
}
payload = {"keyword": keyword, "page": page, "page_size": page_size, "sort": "general"}
response = requests.post(url, headers=headers, json=payload, timeout=10)
return response.json()
def get_note_detail(self, note_id):
url = f"https://open.xiaohongshu.com/api/notes/{note_id}"
headers = {"Authorization": f"Bearer {self.get_access_token()}"}
return requests.get(url, headers=headers, timeout=10).json()
def get_user_notes(self, user_id, page=1):
url = f"https://open.xiaohongshu.com/api/users/{user_id}/notes"
headers = {"Authorization": f"Bearer {self.get_access_token()}"}
params = {"page": page, "page_size": 20}
return requests.get(url, headers=headers, params=params, timeout=10).json()
if __name__ == "__main__":
api = XiaohongshuAPI(app_id="YOUR_APP_ID", app_secret="YOUR_APP_SECRET")
search_result = api.search_notes(keyword="新能源汽车", page=1)
print(f"搜索到 {search_result['data']['total']} 篇笔记")
for note in search_result['data']['notes'][:5]:
print(f"标题: {note['title']}, 点赞: {note['liked_count']}")
2.2 基于Selenium的模拟浏览器采集
对于需要模拟登录或复杂交互的场景,可以使用Selenium模拟真实浏览器操作。这种方式可以绕过部分反爬虫检测,但需要控制请求频率。
from selenium import webdriver
from selenium.webdriver.common.by import By
from selenium.webdriver.support.ui import WebDriverWait
from selenium.webdriver.support import expected_conditions as EC
from selenium.webdriver.chrome.options import Options
from bs4 import BeautifulSoup
import time
import random
class XiaohongshuCrawler:
"""基于Selenium的小红书数据采集器"""
def __init__(self, headless=True):
chrome_options = Options()
if headless:
chrome_options.add_argument("--headless")
chrome_options.add_argument("--no-sandbox")
chrome_options.add_argument("--disable-dev-shm-usage")
chrome_options.add_argument("--disable-gpu")
chrome_options.add_argument("--window-size=1920,1080")
user_agent = "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 Chrome/120.0.0.0 Safari/537.36"
chrome_options.add_argument(f"user-agent={user_agent}")
chrome_options.add_experimental_option("excludeSwitches", ["enable-automation"])
self.driver = webdriver.Chrome(options=chrome_options)
self.driver.execute_cdp_cmd("Page.addScriptToEvaluateOnNewDocument", {
"source": "Object.defineProperty(navigator, 'webdriver', {get: () => undefined});"
})
def wait_random(self, min_sec=1, max_sec=3):
time.sleep(random.uniform(min_sec, max_sec))
def scroll_page(self, scroll_count=3):
for _ in range(scroll_count):
self.driver.execute_script("window.scrollTo(0, document.body.scrollHeight);")
self.wait_random(1, 2)
self.driver.execute_script("window.scrollBy(0, -200);")
self.wait_random(0.5, 1)
def search(self, keyword, max_notes=50):
search_url = f"https://www.xiaohongshu.com/search_result?keyword={keyword}&type=51"
self.driver.get(search_url)
self.wait_random(2, 4)
notes = []
last_height = self.driver.execute_script("return document.body.scrollHeight")
while len(notes) < max_notes:
self.scroll_page(scroll_count=2)
soup = BeautifulSoup(self.driver.page_source, "html.parser")
cards = soup.select(".note-item")
for card in cards:
try:
title = card.select_one(".title").text.strip()
author = card.select_one(".user-name").text.strip()
likes = card.select_one(".like-count").text.strip()
notes.append({"title": title, "author": author, "likes": likes})
except:
continue
new_height = self.driver.execute_script("return document.body.scrollHeight")
if new_height == last_height:
break
last_height = new_height
return notes[:max_notes]
def close(self):
self.driver.quit()
if __name__ == "__main__":
crawler = XiaohongshuCrawler(headless=True)
try:
results = crawler.search("面膜推荐", max_notes=30)
print(f"采集到 {len(results)} 篇笔记")
for r in results:
print(f"- {r['title']} | 作者: {r['author']} | 点赞: {r['likes']}")
finally:
crawler.close()
三、合规数据采集的最佳实践
3.1 遵守robots.txt协议
在采集前,首先检查小红书的robots.txt文件,了解平台的采集规则和限制。
import urllib.robotparser
def check_robots_txt(base_url):
rp = urllib.robotparser.RobotFileParser()
rp.set_url(f"{base_url}/robots.txt")
try:
rp.read()
print(f"允许抓取: {rp.can_fetch('*', '/')}")
print(f"允许Googlebot: {rp.can_fetch('Googlebot', '/')}")
for line in rp.raw("/sitemap.xml"):
if "Sitemap:" in line:
print(line.strip())
except Exception as e:
print(f"读取robots.txt失败: {e}")
check_robots_txt("https://www.xiaohongshu.com")
3.2 控制请求频率
过高频率的请求会对目标服务器造成压力,也可能触发反爬虫机制。建议在每次请求间设置合理的延迟。
import time
import threading
from collections import defaultdict
class RateLimiter:
def __init__(self, rate=5, per=60):
self.rate = rate
self.per = per
self.allowance = defaultdict(int)
self.last_check = defaultdict(time.time)
self.lock = threading.Lock()
def acquire(self, key="default"):
with self.lock:
current = time.time()
time_passed = current - self.last_check[key]
self.last_check[key] = current
self.allowance[key] += time_passed * (self.rate / self.per)
if self.allowance[key] > self.rate:
self.allowance[key] = self.rate
if self.allowance[key] < 1.0:
sleep_time = (1.0 - self.allowance[key]) * (self.per / self.rate)
time.sleep(sleep_time)
self.allowance[key] = 0
else:
self.allowance[key] -= 1.0
return True
limiter = RateLimiter(rate=5, per=60)
for i in range(10):
limiter.acquire()
print(f"第{i+1}次请求完成")
3.3 数据存储与安全
import sqlite3
import hashlib
from datetime import datetime
class DataStorage:
def __init__(self, db_path="xiaohongshu_data.db"):
self.db_path = db_path
self.init_db()
def init_db(self):
conn = sqlite3.connect(self.db_path)
cursor = conn.cursor()
cursor.execute("""
CREATE TABLE IF NOT EXISTS notes (
id INTEGER PRIMARY KEY AUTOINCREMENT,
note_id TEXT UNIQUE, title TEXT, content TEXT,
author TEXT, author_id TEXT,
liked_count INTEGER, collected_count INTEGER,
commented_count INTEGER, topics TEXT,
published_at TEXT, crawled_at TEXT, keyword TEXT, md5 TEXT
)
""")
cursor.execute("""
CREATE TABLE IF NOT EXISTS authors (
id INTEGER PRIMARY KEY AUTOINCREMENT,
author_id TEXT UNIQUE, nickname TEXT,
followers INTEGER, verified BOOLEAN, crawled_at TEXT
)
""")
cursor.execute("CREATE INDEX IF NOT EXISTS idx_note_id ON notes(note_id)")
cursor.execute("CREATE INDEX IF NOT EXISTS idx_author ON notes(author_id)")
cursor.execute("CREATE INDEX IF NOT EXISTS idx_keyword ON notes(keyword)")
conn.commit()
conn.close()
def save_note(self, note_data):
conn = sqlite3.connect(self.db_path)
cursor = conn.cursor()
md5_hash = hashlib.md5(
(note_data.get("note_id", "") + note_data.get("title", "")).encode()
).hexdigest()
try:
cursor.execute("""
INSERT OR REPLACE INTO notes (
note_id, title, content, author, author_id,
liked_count, collected_count, commented_count,
topics, published_at, crawled_at, keyword, md5
) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)
""", (
note_data.get("note_id"), note_data.get("title"),
note_data.get("content"), note_data.get("author"),
note_data.get("author_id"), note_data.get("liked_count", 0),
note_data.get("collected_count", 0), note_data.get("commented_count", 0),
",".join(note_data.get("topics", [])), note_data.get("published_at"),
datetime.now().isoformat(), note_data.get("keyword"), md5_hash
))
conn.commit()
finally:
conn.close()
def get_stats(self):
conn = sqlite3.connect(self.db_path)
cursor = conn.cursor()
cursor.execute("SELECT COUNT(*) FROM notes")
total_notes = cursor.fetchone()[0]
cursor.execute("SELECT COUNT(DISTINCT author_id) FROM notes")
total_authors = cursor.fetchone()[0]
conn.close()
return {"total_notes": total_notes, "total_authors": total_authors}
storage = DataStorage("jiayus_yuqing.db")
stats = storage.get_stats()
print(f"总笔记数: {stats['total_notes']}")
四、舆情监测系统的构建
采集到原始数据后,需要通过数据清洗、分析和可视化,构建完整的舆情监测系统。
import pandas as pd
from collections import Counter
import re
class SentimentAnalyzer:
def __init__(self):
self.positive_words = set([
"优秀", "完美", "惊艳", "推荐", "值得", "好用", "喜欢",
"超赞", "非常", "满意", "良心", "靠谱", "棒", "赞"
])
self.negative_words = set([
"失望", "差", "坑", "骗", "烂", "难用", "后悔", "假货",
"过敏", "投诉", "维权", "虚假", "欺骗", "垃圾", "差评"
])
def clean_text(self, text):
if not text:
return ""
text = re.sub(r"[^\u4e00-\u9fa5a-zA-Z0-9]", " ", text)
return text.strip()
def analyze_sentiment(self, text):
text = self.clean_text(text)
positive_count = sum(1 for word in self.positive_words if word in text)
negative_count = sum(1 for word in self.negative_words if word in text)
if positive_count > negative_count:
return "positive"
elif negative_count > positive_count:
return "negative"
else:
return "neutral"
def generate_report(self, notes_data):
sentiments = [self.analyze_sentiment(n.get("content", "")) for n in notes_data]
sentiment_dist = Counter(sentiments)
return {
"total": len(notes_data),
"positive": sentiment_dist.get("positive", 0),
"negative": sentiment_dist.get("negative", 0),
"neutral": sentiment_dist.get("neutral", 0)
}
analyzer = SentimentAnalyzer()
print(analyzer.analyze_sentiment("这款产品非常好用,非常推荐!"))
print(analyzer.analyze_sentiment("太差了,完全是假货,骗人的!"))
五、结语
小红书作为重要的社交流量入口,其数据价值不容忽视。企业在开展舆情监测工作时,应当在合规的前提下,合理利用数据技术,挖掘数据背后的用户声音,为品牌决策提供科学依据。
值得注意的是,本文介绍的技术方案仅供学习和研究使用。任何商业化的数据采集行为,都应当事先取得平台授权,遵守《网络安全法》《数据安全法》《个人信息保护法》等相关法律法规,确保数据采集和使用的合法合规。
甲鱼舆情监测平台致力于为企业提供专业、合规的舆情监测解决方案,支持小红书、微博、微信、抖音等多平台数据监测,助力企业把握舆情动态,提升品牌管理能力。
相关阅读:
- 《舆情监测与API接口数据对接实战指南》
- 《微信视频号内容采集技术方案详解》
- 《网站反爬虫机制与数据安全保护策略》