使用示例

openclaw AI小龙虾攻略 2

我来帮你设计一个具备数据保留功能的 OpenClaw(机械爪)类,以下是一个完整的实现方案:

使用示例-第1张图片-官方获取 | OpenClaw下载 - openclaw官网

import json
import pickle
import time
from datetime import datetime
from dataclasses import dataclass, asdict, field
from typing import List, Dict, Optional, Any
from enum import Enum
class ClawState(Enum):
    """机械爪状态枚举"""
    OPEN = "open"
    CLOSED = "closed"
    HALF_OPEN = "half_open"
    ERROR = "error"
@dataclass
class OperationRecord:
    """操作记录数据类"""
    timestamp: float
    datetime: str
    action: str
    position: tuple
    force: float
    state: ClawState
    success: bool
    metadata: Dict[str, Any] = field(default_factory=dict)
class OpenClaw:
    """
    具备数据保留功能的智能机械爪
    特点:
    1. 自动记录所有操作历史
    2. 支持多种数据持久化方式
    3. 可配置的数据保留策略
    4. 实时状态监控
    """
    def __init__(self, 
                 claw_id: str = "default_claw",
                 max_history: int = 1000,
                 auto_save: bool = True,
                 save_format: str = "json"):
        """
        初始化机械爪
        Args:
            claw_id: 机械爪唯一标识
            max_history: 最大历史记录数(0表示无限制)
            auto_save: 是否自动保存
            save_format: 保存格式(json/pickle)
        """
        self.claw_id = claw_id
        self.max_history = max_history
        self.auto_save = auto_save
        self.save_format = save_format
        # 当前状态
        self.state = ClawState.CLOSED
        self.position = (0.0, 0.0, 0.0)  # (x, y, z)
        self.current_force = 0.0
        self.is_active = False
        # 数据存储
        self._operation_history: List[OperationRecord] = []
        self._config = {
            "max_force": 50.0,  # 最大抓取力(N)
            "min_force": 0.5,   # 最小抓取力(N)
            "speed": 1.0,       # 移动速度
            "sensitivity": 0.1, # 灵敏度
        }
        # 统计数据
        self._stats = {
            "total_operations": 0,
            "successful_operations": 0,
            "failed_operations": 0,
            "total_force_applied": 0.0,
            "last_maintenance": time.time(),
        }
        # 加载历史数据(如果存在)
        self._load_history()
    def open(self, force: float = 5.0) -> bool:
        """打开机械爪"""
        return self._execute_operation("open", force)
    def close(self, force: float = 10.0) -> bool:
        """关闭机械爪"""
        return self._execute_operation("close", force)
    def move_to(self, x: float, y: float, z: float) -> bool:
        """移动到指定位置"""
        old_position = self.position
        self.position = (x, y, z)
        record = OperationRecord(
            timestamp=time.time(),
            datetime=datetime.now().isoformat(),
            action="move",
            position=self.position,
            force=0.0,
            state=self.state,
            success=True,
            metadata={
                "from_position": old_position,
                "to_position": self.position,
                "distance": self._calculate_distance(old_position, self.position)
            }
        )
        self._add_record(record)
        return True
    def grip(self, target_force: float, object_type: str = "unknown") -> bool:
        """
        抓取物体
        Args:
            target_force: 目标抓取力
            object_type: 物体类型
        """
        if target_force > self._config["max_force"]:
            print(f"警告:目标抓取力 {target_force}N 超过最大限制 {self._config['max_force']}N")
            return False
        self.state = ClawState.CLOSED
        self.current_force = target_force
        record = OperationRecord(
            timestamp=time.time(),
            datetime=datetime.now().isoformat(),
            action="grip",
            position=self.position,
            force=target_force,
            state=self.state,
            success=True,
            metadata={
                "object_type": object_type,
                "force_applied": target_force,
                "config_used": self._config.copy()
            }
        )
        self._add_record(record)
        self._stats["total_force_applied"] += target_force
        return True
    def release(self) -> bool:
        """释放物体"""
        self.state = ClawState.OPEN
        self.current_force = 0.0
        record = OperationRecord(
            timestamp=time.time(),
            datetime=datetime.now().isoformat(),
            action="release",
            position=self.position,
            force=0.0,
            state=self.state,
            success=True,
            metadata={"force_before_release": self.current_force}
        )
        self._add_record(record)
        return True
    def _execute_operation(self, action: str, force: float) -> bool:
        """执行基础操作"""
        success = False
        try:
            if action == "open":
                self.state = ClawState.OPEN
                self.current_force = 0.0
                success = True
            elif action == "close":
                self.state = ClawState.CLOSED
                self.current_force = force
                success = True
            record = OperationRecord(
                timestamp=time.time(),
                datetime=datetime.now().isoformat(),
                action=action,
                position=self.position,
                force=force,
                state=self.state,
                success=success,
                metadata={"target_force": force}
            )
            self._add_record(record)
            if success:
                self._stats["successful_operations"] += 1
            else:
                self._stats["failed_operations"] += 1
            self._stats["total_operations"] += 1
        except Exception as e:
            record = OperationRecord(
                timestamp=time.time(),
                datetime=datetime.now().isoformat(),
                action=action,
                position=self.position,
                force=force,
                state=ClawState.ERROR,
                success=False,
                metadata={"error": str(e)}
            )
            self._add_record(record)
            self._stats["failed_operations"] += 1
            self._stats["total_operations"] += 1
        return success
    def _add_record(self, record: OperationRecord):
        """添加操作记录"""
        self._operation_history.append(record)
        # 限制历史记录数量
        if self.max_history > 0 and len(self._operation_history) > self.max_history:
            self._operation_history.pop(0)
        # 自动保存
        if self.auto_save:
            self.save_history()
    def save_history(self, filename: Optional[str] = None):
        """
        保存历史记录到文件
        Args:
            filename: 文件名(默认使用claw_id)
        """
        if filename is None:
            filename = f"{self.claw_id}_history"
        data = {
            "claw_id": self.claw_id,
            "config": self._config,
            "stats": self._stats,
            "current_state": {
                "position": self.position,
                "force": self.current_force,
                "state": self.state.value
            },
            "history": [asdict(record) for record in self._operation_history]
        }
        if self.save_format == "json":
            with open(f"{filename}.json", "w", encoding="utf-8") as f:
                json.dump(data, f, indent=2, ensure_ascii=False)
        elif self.save_format == "pickle":
            with open(f"{filename}.pkl", "wb") as f:
                pickle.dump(data, f)
    def _load_history(self):
        """加载历史记录"""
        filename = f"{self.claw_id}_history"
        try:
            if self.save_format == "json":
                with open(f"{filename}.json", "r", encoding="utf-8") as f:
                    data = json.load(f)
            elif self.save_format == "pickle":
                with open(f"{filename}.pkl", "rb") as f:
                    data = pickle.load(f)
            else:
                return
            # 更新配置和统计数据
            self._config = data.get("config", self._config)
            self._stats = data.get("stats", self._stats)
            # 加载历史记录
            history_data = data.get("history", [])
            for record_data in history_data:
                record_data["state"] = ClawState(record_data["state"])
                record = OperationRecord(**record_data)
                self._operation_history.append(record)
        except FileNotFoundError:
            print("未找到历史记录文件,创建新记录")
        except Exception as e:
            print(f"加载历史记录时出错: {e}")
    def get_history(self, 
                   start_time: Optional[float] = None,
                   end_time: Optional[float] = None,
                   action_filter: Optional[List[str]] = None,
                   min_force: Optional[float] = None) -> List[OperationRecord]:
        """
        获取历史记录
        Args:
            start_time: 开始时间戳
            end_time: 结束时间戳
            action_filter: 动作过滤器
            min_force: 最小力度
        Returns:
            过滤后的历史记录
        """
        filtered = self._operation_history
        if start_time:
            filtered = [r for r in filtered if r.timestamp >= start_time]
        if end_time:
            filtered = [r for r in filtered if r.timestamp <= end_time]
        if action_filter:
            filtered = [r for r in filtered if r.action in action_filter]
        if min_force is not None:
            filtered = [r for r in filtered if r.force >= min_force]
        return filtered
    def get_stats(self) -> Dict[str, Any]:
        """获取统计数据"""
        stats = self._stats.copy()
        stats.update({
            "history_size": len(self._operation_history),
            "uptime": time.time() - stats["last_maintenance"],
            "current_position": self.position,
            "current_state": self.state.value,
            "current_force": self.current_force,
        })
        return stats
    def clear_history(self, confirm: bool = False) -> bool:
        """清空历史记录"""
        if not confirm:
            print("请确认清空历史记录,调用时设置 confirm=True")
            return False
        self._operation_history.clear()
        print("历史记录已清空")
        return True
    def export_report(self, filename: str = "claw_report"):
        """导出详细报告"""
        stats = self.get_stats()
        recent_operations = self.get_history()[-10:]  # 最近10次操作
        report = {
            "report_generated": datetime.now().isoformat(),
            "claw_id": self.claw_id,
            "summary": {
                "total_operations": stats["total_operations"],
                "success_rate": (stats["successful_operations"] / stats["total_operations"] * 100 
                               if stats["total_operations"] > 0 else 0),
                "average_force": (stats["total_force_applied"] / stats["successful_operations"] 
                                if stats["successful_operations"] > 0 else 0),
                "uptime_hours": stats["uptime"] / 3600,
            },
            "config": self._config,
            "recent_operations": [asdict(op) for op in recent_operations],
            "performance_metrics": {
                "operations_per_hour": stats["total_operations"] / (stats["uptime"] / 3600) 
                                      if stats["uptime"] > 0 else 0,
                "error_rate": (stats["failed_operations"] / stats["total_operations"] * 100 
                             if stats["total_operations"] > 0 else 0),
            }
        }
        with open(f"{filename}.json", "w", encoding="utf-8") as f:
            json.dump(report, f, indent=2, ensure_ascii=False)
        return report
    @staticmethod
    def _calculate_distance(pos1: tuple, pos2: tuple) -> float:
        """计算两点间距离"""
        return ((pos1[0] - pos2[0])**2 + 
                (pos1[1] - pos2[1])**2 + 
                (pos1[2] - pos2[2])**2) ** 0.5
    def __str__(self) -> str:
        """字符串表示"""
        return (f"OpenClaw(id={self.claw_id}, "
                f"state={self.state.value}, "
                f"position={self.position}, "
                f"force={self.current_force}N, "
                f"operations={self._stats['total_operations']})")
    def __enter__(self):
        """上下文管理器入口"""
        self.is_active = True
        print(f"{self.claw_id} 已激活")
        return self
    def __exit__(self, exc_type, exc_val, exc_tb):
        """上下文管理器退出"""
        self.is_active = False
        self.save_history()
        print(f"{self.claw_id} 已停用,数据已保存")
def demo_openclaw():
    """演示如何使用 OpenClaw"""
    # 创建机械爪实例
    claw = OpenClaw(
        claw_id="robot_arm_01",
        max_history=500,
        auto_save=True,
        save_format="json"
    )
    # 使用上下文管理器确保资源正确释放
    with claw:
        # 执行一系列操作
        claw.move_to(10.5, 5.2, 3.1)
        claw.grip(8.5, object_type="cube")
        claw.move_to(12.0, 6.0, 2.5)
        claw.release()
        # 获取统计数据
        stats = claw.get_stats()
        print("当前统计数据:", stats)
        # 获取最近5次操作
        recent_ops = claw.get_history()[-5:]
        print(f"len(recent_ops)}次操作:")
        for op in recent_ops:
            print(f"  - {op.datetime}: {op.action} at {op.position}")
        # 导出报告
        report = claw.export_report("demo_report")
        print(f"报告已生成,包含 {len(report['recent_operations'])} 条操作记录")
    # 保存最终状态
    claw.save_history()
if __name__ == "__main__":
    demo_openclaw()

主要功能特性:

数据保留功能

  • 自动记录所有操作的时间戳、位置、力度、状态
  • 支持操作元数据存储
  • 可配置的历史记录上限

多种持久化方式

  • JSON格式(人类可读)
  • Pickle格式(Python对象)
  • 自动加载上次保存的数据

智能查询功能

  • 按时间范围查询
  • 按操作类型过滤
  • 按力度阈值筛选

统计分析

  • 成功率统计
  • 平均力度计算
  • 运行时长统计
  • 性能指标分析

报告生成

  • 详细操作报告
  • 性能分析报告
  • 可导出为JSON文件

使用方法:

# 创建机械爪
claw = OpenClaw(claw_id="my_claw")
# 执行操作
claw.open()
claw.move_to(10, 20, 30)
claw.grip(15.0, object_type="bottle")
claw.release()
# 查询历史
history = claw.get_history(
    start_time=time.time() - 3600,  # 过去1小时
    action_filter=["grip", "release"]
)
# 获取统计
stats = claw.get_stats()
# 导出报告
claw.export_report()
# 手动保存
claw.save_history("backup_data")

这个设计具有很好的扩展性,可以根据具体需求添加更多传感器数据、故障记录、机器学习预测等功能。

标签: 使用 示例

抱歉,评论功能暂时关闭!