Skip to content

Plugin Examples

File processor plugin

A plugin that handles file processing with lifecycle management and scheduled cleanup.

python
import os
import shutil
from pathlib import Path
from typing import Any, Optional
from plugin.sdk.base import NekoPluginBase
from plugin.sdk.decorators import (
    neko_plugin, plugin_entry, lifecycle, timer_interval
)

@neko_plugin
class FileProcessorPlugin(NekoPluginBase):
    def __init__(self, ctx: Any):
        super().__init__(ctx)
        self.logger = ctx.logger
        self.work_dir = Path("/tmp/file_processor")
        self.work_dir.mkdir(exist_ok=True)
        self.processed_count = 0

    @lifecycle(id="startup")
    def startup(self, **_):
        self.logger.info("FileProcessorPlugin starting...")
        self.report_status({"status": "initialized"})
        return {"status": "ready"}

    @lifecycle(id="shutdown")
    def shutdown(self, **_):
        if self.work_dir.exists():
            shutil.rmtree(self.work_dir)
        return {"status": "stopped"}

    @plugin_entry(
        id="process_file",
        name="Process File",
        input_schema={
            "type": "object",
            "properties": {
                "file_path": {"type": "string"},
                "operation": {
                    "type": "string",
                    "enum": ["compress", "extract", "convert"]
                }
            },
            "required": ["file_path", "operation"]
        }
    )
    def process_file(self, file_path: str, operation: str, **_):
        self.report_status({"status": "processing", "file": file_path})

        try:
            # ... processing logic ...
            self.processed_count += 1
            self.ctx.push_message(
                source="file_processor",
                message_type="text",
                description="File processed",
                priority=6,
                content=f"Processed {file_path}",
            )
            return {"success": True}
        except Exception as e:
            self.logger.exception(f"Error: {e}")
            return {"success": False, "error": str(e)}

    @timer_interval(id="cleanup", seconds=3600, auto_start=True)
    def cleanup(self, **_):
        self.logger.info("Cleaning temporary files...")
        return {"cleaned": True}

Async API client plugin

A plugin that calls external APIs with async support and batch operations.

python
import asyncio
import aiohttp
from typing import Any, Optional, Dict
from plugin.sdk.base import NekoPluginBase
from plugin.sdk.decorators import neko_plugin, plugin_entry, lifecycle

@neko_plugin
class APIClientPlugin(NekoPluginBase):
    def __init__(self, ctx: Any):
        super().__init__(ctx)
        self.logger = ctx.logger
        self.session: Optional[aiohttp.ClientSession] = None
        self.base_url = "https://api.example.com"

    @lifecycle(id="startup")
    async def startup(self, **_):
        self.session = aiohttp.ClientSession()
        return {"status": "ready"}

    @lifecycle(id="shutdown")
    async def shutdown(self, **_):
        if self.session:
            await self.session.close()
        return {"status": "stopped"}

    @plugin_entry(
        id="fetch_data",
        name="Fetch Data",
        input_schema={
            "type": "object",
            "properties": {
                "endpoint": {"type": "string"},
                "method": {
                    "type": "string",
                    "enum": ["GET", "POST"],
                    "default": "GET"
                }
            },
            "required": ["endpoint"]
        }
    )
    async def fetch_data(self, endpoint: str, method: str = "GET", **_):
        url = f"{self.base_url}/{endpoint.lstrip('/')}"
        async with self.session.request(method, url) as response:
            data = await response.json()
            return {"success": True, "data": data}

    @plugin_entry(
        id="batch_fetch",
        input_schema={
            "type": "object",
            "properties": {
                "endpoints": {
                    "type": "array",
                    "items": {"type": "string"}
                },
                "concurrent": {
                    "type": "integer",
                    "minimum": 1,
                    "maximum": 10,
                    "default": 3
                }
            },
            "required": ["endpoints"]
        }
    )
    async def batch_fetch(self, endpoints: list, concurrent: int = 3, **_):
        semaphore = asyncio.Semaphore(concurrent)

        async def fetch_one(ep):
            async with semaphore:
                return await self.fetch_data(ep)

        results = await asyncio.gather(
            *[fetch_one(ep) for ep in endpoints],
            return_exceptions=True
        )
        success = sum(1 for r in results if isinstance(r, dict) and r.get("success"))
        return {"success_count": success, "total": len(endpoints)}

Data collector with persistence

python
import json
from datetime import datetime
from pathlib import Path
from typing import Any, Optional, Dict
from plugin.sdk.base import NekoPluginBase
from plugin.sdk.decorators import (
    neko_plugin, plugin_entry, lifecycle, timer_interval
)

@neko_plugin
class DataCollectorPlugin(NekoPluginBase):
    def __init__(self, ctx: Any):
        super().__init__(ctx)
        self.logger = ctx.logger
        self.data_dir = ctx.config_path.parent / "data"
        self.data_dir.mkdir(exist_ok=True)
        self.collection_count = 0

    @plugin_entry(
        id="collect",
        input_schema={
            "type": "object",
            "properties": {
                "source": {"type": "string"}
            },
            "required": ["source"]
        }
    )
    def collect(self, source: str, **_):
        data = self._fetch_data(source)

        filename = f"{source}_{datetime.now().isoformat()}.json"
        filepath = self.data_dir / filename
        filepath.write_text(json.dumps(data, indent=2))

        self.collection_count += 1
        self.ctx.push_message(
            source="collector",
            message_type="text",
            priority=5,
            content=f"Collected {len(data)} records from {source}",
        )
        return {"count": len(data), "file": filename}

    @timer_interval(id="auto_collect", seconds=300, auto_start=True)
    def auto_collect(self, **_):
        for source in ["source1", "source2"]:
            self.collect(source=source)

    @plugin_entry(id="stats")
    def stats(self, **_):
        files = list(self.data_dir.glob("*.json"))
        return {"collection_count": self.collection_count, "files": len(files)}

    def _fetch_data(self, source):
        # Replace with actual data fetching logic
        return [{"id": 1, "source": source}]

Released under the MIT License.