Skip to content

プラグインのサンプル

ファイル処理プラグイン

ライフサイクル管理とスケジュールクリーンアップを備えたファイル処理プラグインです。

python
import os
import shutil
from pathlib import Path
from typing import Any, Optional
from plugin.sdk.base import NekoPluginBase
from plugin.sdk.decorators import (
    neko_plugin, plugin_entry, lifecycle, timer_interval
)

@neko_plugin
class FileProcessorPlugin(NekoPluginBase):
    def __init__(self, ctx: Any):
        super().__init__(ctx)
        self.logger = ctx.logger
        self.work_dir = Path("/tmp/file_processor")
        self.work_dir.mkdir(exist_ok=True)
        self.processed_count = 0

    @lifecycle(id="startup")
    def startup(self, **_):
        self.logger.info("FileProcessorPlugin starting...")
        self.report_status({"status": "initialized"})
        return {"status": "ready"}

    @lifecycle(id="shutdown")
    def shutdown(self, **_):
        if self.work_dir.exists():
            shutil.rmtree(self.work_dir)
        return {"status": "stopped"}

    @plugin_entry(
        id="process_file",
        name="Process File",
        input_schema={
            "type": "object",
            "properties": {
                "file_path": {"type": "string"},
                "operation": {
                    "type": "string",
                    "enum": ["compress", "extract", "convert"]
                }
            },
            "required": ["file_path", "operation"]
        }
    )
    def process_file(self, file_path: str, operation: str, **_):
        self.report_status({"status": "processing", "file": file_path})

        try:
            # ... 処理ロジック ...
            self.processed_count += 1
            self.ctx.push_message(
                source="file_processor",
                message_type="text",
                description="File processed",
                priority=6,
                content=f"Processed {file_path}",
            )
            return {"success": True}
        except Exception as e:
            self.logger.exception(f"Error: {e}")
            return {"success": False, "error": str(e)}

    @timer_interval(id="cleanup", seconds=3600, auto_start=True)
    def cleanup(self, **_):
        self.logger.info("Cleaning temporary files...")
        return {"cleaned": True}

非同期 API クライアントプラグイン

非同期サポートとバッチ操作を備えた外部 API を呼び出すプラグインです。

python
import asyncio
import aiohttp
from typing import Any, Optional, Dict
from plugin.sdk.base import NekoPluginBase
from plugin.sdk.decorators import neko_plugin, plugin_entry, lifecycle

@neko_plugin
class APIClientPlugin(NekoPluginBase):
    def __init__(self, ctx: Any):
        super().__init__(ctx)
        self.logger = ctx.logger
        self.session: Optional[aiohttp.ClientSession] = None
        self.base_url = "https://api.example.com"

    @lifecycle(id="startup")
    async def startup(self, **_):
        self.session = aiohttp.ClientSession()
        return {"status": "ready"}

    @lifecycle(id="shutdown")
    async def shutdown(self, **_):
        if self.session:
            await self.session.close()
        return {"status": "stopped"}

    @plugin_entry(
        id="fetch_data",
        name="Fetch Data",
        input_schema={
            "type": "object",
            "properties": {
                "endpoint": {"type": "string"},
                "method": {
                    "type": "string",
                    "enum": ["GET", "POST"],
                    "default": "GET"
                }
            },
            "required": ["endpoint"]
        }
    )
    async def fetch_data(self, endpoint: str, method: str = "GET", **_):
        url = f"{self.base_url}/{endpoint.lstrip('/')}"
        async with self.session.request(method, url) as response:
            data = await response.json()
            return {"success": True, "data": data}

    @plugin_entry(
        id="batch_fetch",
        input_schema={
            "type": "object",
            "properties": {
                "endpoints": {
                    "type": "array",
                    "items": {"type": "string"}
                },
                "concurrent": {
                    "type": "integer",
                    "minimum": 1,
                    "maximum": 10,
                    "default": 3
                }
            },
            "required": ["endpoints"]
        }
    )
    async def batch_fetch(self, endpoints: list, concurrent: int = 3, **_):
        semaphore = asyncio.Semaphore(concurrent)

        async def fetch_one(ep):
            async with semaphore:
                return await self.fetch_data(ep)

        results = await asyncio.gather(
            *[fetch_one(ep) for ep in endpoints],
            return_exceptions=True
        )
        success = sum(1 for r in results if isinstance(r, dict) and r.get("success"))
        return {"success_count": success, "total": len(endpoints)}

永続化付きデータコレクター

python
import json
from datetime import datetime
from pathlib import Path
from typing import Any, Optional, Dict
from plugin.sdk.base import NekoPluginBase
from plugin.sdk.decorators import (
    neko_plugin, plugin_entry, lifecycle, timer_interval
)

@neko_plugin
class DataCollectorPlugin(NekoPluginBase):
    def __init__(self, ctx: Any):
        super().__init__(ctx)
        self.logger = ctx.logger
        self.data_dir = ctx.config_path.parent / "data"
        self.data_dir.mkdir(exist_ok=True)
        self.collection_count = 0

    @plugin_entry(
        id="collect",
        input_schema={
            "type": "object",
            "properties": {
                "source": {"type": "string"}
            },
            "required": ["source"]
        }
    )
    def collect(self, source: str, **_):
        data = self._fetch_data(source)

        filename = f"{source}_{datetime.now().isoformat()}.json"
        filepath = self.data_dir / filename
        filepath.write_text(json.dumps(data, indent=2))

        self.collection_count += 1
        self.ctx.push_message(
            source="collector",
            message_type="text",
            priority=5,
            content=f"Collected {len(data)} records from {source}",
        )
        return {"count": len(data), "file": filename}

    @timer_interval(id="auto_collect", seconds=300, auto_start=True)
    def auto_collect(self, **_):
        for source in ["source1", "source2"]:
            self.collect(source=source)

    @plugin_entry(id="stats")
    def stats(self, **_):
        files = list(self.data_dir.glob("*.json"))
        return {"collection_count": self.collection_count, "files": len(files)}

    def _fetch_data(self, source):
        # 実際のデータ取得ロジックに置き換えてください
        return [{"id": 1, "source": source}]

MIT ライセンスの下で公開。