连接本地MongoDB

openclaw AI小龙虾攻略 2

AI小龙虾OPENCLAWMongoDB 集成,可以通过以下步骤实现数据存储、检索和管理,以下是具体方法和代码示例:

连接本地MongoDB-第1张图片-官方获取 | OpenClaw下载 - openclaw官网


环境准备

安装依赖

pip install pymongo

连接 MongoDB

基本连接配置

from pymongo import MongoClient
client = MongoClient('mongodb://localhost:27017/')
# 连接远程MongoDB(需认证)
# client = MongoClient('mongodb://username:password@host:port/')
# 选择数据库
db = client['openclaw_db']
# 选择集合(类似表)
collection = db['claw_data']

数据模型设计

根据 OPENCLAW 的业务需求设计文档结构,

claw_document = {
    "task_id": "12345",
    "timestamp": "2023-10-01T12:00:00Z",
    "input_data": {"url": "https://example.com", "depth": 2},
    "output_data": {
        "crawled_pages": [
            {"url": "https://example.com/page1", "content": "..."},
            {"url": "https://example.com/page2", "content": "..."}
        ],
        "metadata": {"page_count": 10, "status": "completed"}
    },
    "status": "completed",
    "processing_time": 5.2
}

核心操作示例

插入数据

# 插入单条数据
result = collection.insert_one(claw_document)
print(f"插入ID: {result.inserted_id}")
# 批量插入
documents = [doc1, doc2, doc3]
result = collection.insert_many(documents)

查询数据

# 查询所有数据
for doc in collection.find():
    print(doc)
# 条件查询
query = {"status": "completed"}
results = collection.find(query)
# 查询特定字段
results = collection.find({}, {"task_id": 1, "timestamp": 1})
# 查询单条记录
doc = collection.find_one({"task_id": "12345"})

更新数据

# 更新单条
collection.update_one(
    {"task_id": "12345"},
    {"$set": {"status": "failed", "error": "timeout"}}
)
# 批量更新
collection.update_many(
    {"status": "pending"},
    {"$set": {"status": "processing"}}
)

删除数据

# 删除单条
collection.delete_one({"task_id": "12345"})
# 批量删除
collection.delete_many({"status": "failed"})

聚合查询

pipeline = [
    {"$match": {"status": "completed"}},
    {"$group": {"_id": None, "avg_time": {"$avg": "$processing_time"}}}
]
result = list(collection.aggregate(pipeline))

高级集成方案

1 异步支持(使用 Motor)

# 安装异步驱动
# pip install motor
import motor.motor_asyncio
import asyncio
async def async_example():
    client = motor.motor_asyncio.AsyncIOMotorClient('mongodb://localhost:27017')
    db = client.openclaw_db
    collection = db.claw_data
    # 异步插入
    await collection.insert_one(claw_document)
    # 异步查询
    cursor = collection.find({"status": "completed"})
    async for doc in cursor:
        print(doc)

2 数据索引优化

# 创建索引提升查询性能
collection.create_index("task_id", unique=True)  # 唯一索引
collection.create_index([("timestamp", -1)])  # 时间倒序索引
collection.create_index([("status", 1), ("timestamp", -1)])  # 复合索引

3 错误处理与重连

from pymongo.errors import ConnectionFailure, OperationFailure
def safe_operation():
    try:
        client = MongoClient('mongodb://localhost:27017/', 
                            serverSelectionTimeoutMS=5000)
        client.server_info()  # 触发连接测试
        # 执行操作
        collection.insert_one(data)
    except ConnectionFailure as e:
        print(f"连接失败: {e}")
        # 重试逻辑
    except OperationFailure as e:
        print(f"操作失败: {e}")

4 数据备份与恢复

# 备份指定集合
mongodump --db openclaw_db --collection claw_data --out ./backup/
# 恢复数据
mongorestore --db openclaw_db ./backup/openclaw_db/

完整示例:OPENCLAW 数据处理器

class MongoDBStorage:
    def __init__(self, uri='mongodb://localhost:27017/'):
        self.client = MongoClient(uri)
        self.db = self.client['openclaw_db']
        self.collection = self.db['crawl_results']
    def save_crawl_result(self, task_id, url, content, metadata=None):
        document = {
            "task_id": task_id,
            "url": url,
            "content": content,
            "metadata": metadata or {},
            "crawled_at": datetime.now(),
            "processed": False
        }
        return self.collection.insert_one(document)
    def get_pending_tasks(self, limit=100):
        return list(self.collection.find(
            {"processed": False},
            limit=limit
        ))
    def mark_as_processed(self, task_id):
        self.collection.update_one(
            {"task_id": task_id},
            {"$set": {"processed": True}}
        )
    def get_stats(self):
        pipeline = [
            {"$group": {
                "_id": None,
                "total": {"$sum": 1},
                "processed": {"$sum": {"$cond": ["$processed", 1, 0]}},
                "avg_length": {"$avg": {"$strLenCP": "$content"}}
            }}
        ]
        return self.collection.aggregate(pipeline)
# 使用示例
storage = MongoDBStorage()
storage.save_crawl_result("task_001", "https://example.com", "<html>...")
pending = storage.get_pending_tasks()
stats = storage.get_stats()

最佳实践建议

  1. 连接池管理:复用 MongoDB 连接,避免频繁创建连接
  2. 数据分片:对于大数据量,考虑分片存储
  3. 读写分离:利用 MongoDB 的副本集实现读写分离
  4. 数据清理:设置 TTL 索引自动清理旧数据
    collection.create_index("timestamp", expireAfterSeconds=604800)  # 7天过期
  5. 监控告警:监控集合大小、查询性能等指标

故障排查

# 查看集合统计信息
print(db.command("collstats", "claw_data"))
# 查看当前操作
print(db.current_op())
# 检查索引使用情况
print(collection.index_information())

按照以上步骤,你可以将 OPENCLAW 与 MongoDB 高效集成,实现灵活的数据存储和检索功能,根据实际业务需求调整数据模型和查询逻辑。

标签: 本地连接 MongoDB操作

抱歉,评论功能暂时关闭!