Developer

Connector Authoring Guide

How to build connectors for Kaduno Pullpush — AI-generated, manually coded, or JSON-defined.


Connector types

Type Where it lives How it's created
Builtin (code) packages/connectors/src/ Manually coded TypeScript
AI-generated (code) packages/connectors/src/generated/ pullpush.connector.generate MCP tool
Dynamic (JSON definition) Database (ConnectorDefinition table) pullpush.connector.define MCP tool or Admin UI
MCP protocol Database (protocol: mcp) JSON definition that delegates to an external MCP server

Canonical-aware. Connectors are built around the canonical data model. Source connectors emit canonical events (NormalizedEvent carries canonicalType/canonicalVersion); destinations consume canonical payloads in deliver(); and connectors should implement the optional snapshot(connection, opts): AsyncIterable<SnapshotItem> that yields current state in canonical space ({ key, canonicalType, fields }) — this is what powers sync-preview and the reconcile→deliver loop. A snapshot must distinguish a real 0 from "no data" (never emit 0 for a missing scope, or you will wipe destination stock).

Credentials are envelope-encrypted. connection.config is empty at rest; callers hydrate via await hydrateConnectionConfig(prisma, connection) before invoking a connector. Inside a connector, read connection.config (already hydrated by the caller) — never decrypt or log raw credentials yourself.


Builtin connectors (manual TypeScript)

Source connector interface

import type { SourceConnector, RawChange } from '@kaduno/connectors';

export class MySourceConnector implements SourceConnector {
  type = 'my-source';

  async *poll(connection, cursor): AsyncIterable<RawChange> {
    // Yield raw changes since `cursor` (ISO timestamp or page token)
    yield { type: 'item.updated', externalId: '123', payload: { ... }, cursor: '...' };
  }

  supportsWebhook(): boolean {
    return true; // or false
  }

  verifyWebhook(rawBody: string, signature: string, config: Record<string, unknown>): boolean {
    // HMAC verification
  }

  parseWebhook(rawBody: string, headers: Record<string, string>): RawChange[] {
    // Parse incoming webhook payload into raw changes
  }

  async test(connection): Promise<boolean> {
    // Verify credentials work (e.g. GET /api/status)
  }
}

Destination connector interface

import type { DestinationConnector, DeliveryResult } from '@kaduno/connectors';

export class MyDestinationConnector implements DestinationConnector {
  type = 'my-destination';

  async deliver(connection, payload, event): Promise<DeliveryResult> {
    // Send the transformed payload to the destination system
    // Return { success: true, latencyMs } or { success: false, retryable: true, errorMessage: '...' }
  }

  async test(connection): Promise<boolean> {
    // Verify credentials work
  }
}

Built-in connectors are classes implementing SourceConnector / DestinationConnector (see magento/, linnworks/, abicart/). The destination signature is deliver(connection, payload, event) and returns { success, errorCode?, errorMessage?, latencyMs? }. Add the optional snapshot() to make the connector previewable/reconcilable.

snapshot(connection, opts?) contract. Yield canonical SnapshotItems ({ key, canonicalType, fields }). opts.fieldHint is a scope hint (e.g. a target stock location). opts.onlyKeys (when present) restricts the snapshot to a specific set of canonical keys — emit at most those keys. A source that supports it should resolve channel-specific keys to the destination's keys (e.g. Linnworks resolves Abicart article numbers via master SKU first, then uses the preloaded channelSkuCache for instant channel-SKU lookups) so the diff lines up and a destination that carries a subset doesn't trigger a full-catalog fan-out. The pipeline derives onlyKeys from the destination snapshot when scopeToDestination/ scopeSourceToDestination is set (reconcile / preview). opts.channelSkuCache is auto-loaded from the ChannelSkuCache table (built by the channel-sku-cache background job every 12h) — connectors that support it get O(1) channel-SKU resolution instead of per-item API calls. Stock safety: if the target location has no record for an item, skip it — never emit 0 (that would push 0 and wipe real destination stock).

Registration

Register in packages/connectors/src/index.ts inside registerBuiltinConnectors() using the factory form (a new instance per resolve):

import { registerSource, registerDestination } from './registry.js';
import { MySourceConnector } from './my-source/index.js';
import { MyDestinationConnector } from './my-destination/index.js';

registerSource('my-source', () => new MySourceConnector());
registerDestination('my-destination', () => new MyDestinationConnector());

bootstrapConnectors(prisma) calls registerBuiltinConnectors() + loads DB-backed dynamic definitions; every app entrypoint uses it.


AI-generated connectors (code)

Use the MCP tool pullpush.connector.generate:

Tool: pullpush.connector.generate
Input:
  name: "shopify"
  connectorType: "source"
  apiDescription: "Shopify Admin REST API for orders and products"
  apiDocUrl: "https://shopify.dev/docs/api/admin-rest"

This runs a generate → typecheck → test → self-correct loop (up to 3 attempts):

  1. AI generates connector code from the API description/OpenAPI spec
  2. Code is written to packages/connectors/src/generated/<name>.ts
  3. TypeScript compiler validates the code
  4. If typecheck fails, AI self-corrects and retries
  5. On success, the connector is auto-registered in the registry

Generated connectors follow the same interface as builtins. You can later move them to the main source tree and customize.


Dynamic connectors (JSON definition)

For connectors that don't need custom code — just REST API calls with config.

Create via MCP

Tool: pullpush.connector.define
Input:
  name: "My REST API"
  apiDocUrl: "https://api.example.com/openapi.json"
  apiDescription: "REST API with OAuth2, polls /orders endpoint"

This generates a JSON definition and stores it as a ConnectorDefinition record in draft status.

Definition lifecycle

draft → review → active → archived
  • draft: Just generated, not yet usable
  • review: Validated but awaiting approval
  • active: Live and usable for connections (hot-registered on activation)
  • archived: Retired, not loaded on startup

Activate with:

Tool: pullpush.connector.definition.activate
Input: { id: "<definition-id>" }

Definition JSON structure

{
  "metadata": {
    "slug": "my-api",
    "type": "source",
    "description": "My REST API connector",
    "version": "1.0.0"
  },
  "protocol": "rest",
  "auth": {
    "type": "oauth2",
    "tokenUrl": "https://api.example.com/oauth/token",
    "scopes": ["read:orders"]
  },
  "endpoints": {
    "poll": {
      "method": "GET",
      "url": "https://api.example.com/orders",
      "params": { "since": "{{cursor}}", "limit": 100 },
      "pagination": { "type": "cursor", "field": "next_cursor" }
    },
    "test": {
      "method": "GET",
      "url": "https://api.example.com/status"
    }
  },
  "eventMapping": {
    "externalId": "$.id",
    "eventType": "order.updated",
    "occurredAt": "$.updated_at",
    "payload": "$"
  }
}

MCP protocol connectors

For connectors that delegate to an external MCP server:

{
  "metadata": { "slug": "ext-service", "type": "source" },
  "protocol": "mcp",
  "mcpConfig": {
    "serverUrl": "https://ext-service.example.com/mcp",
    "tools": {
      "poll": "ext.orders.list",
      "deliver": "ext.orders.create",
      "test": "ext.health"
    }
  }
}

Testing connectors

Builtin/generated (vitest)

Create a test file alongside the connector:

// packages/connectors/src/__tests__/my-source.test.ts
import { describe, it, expect } from 'vitest';
import { mySource } from '../my-source.js';

describe('my-source', () => {
  it('normalizes order payload', () => {
    const raw = { /* sample API response */ };
    const events = mySource.parseWebhook(JSON.stringify(raw), {});
    expect(events).toHaveLength(1);
    expect(events[0].eventType).toBe('order.created');
  });
});

Use golden-file recorded fixtures for deterministic tests:

  • Store sample API responses in packages/connectors/src/__fixtures__/
  • Assert normalized output matches expected snapshots

Dynamic definitions

Tool: pullpush.connector.definition.validate
Input: { definition: { ... } }
Tool: pullpush.connector.definition.test
Input: { definitionId: "...", connectionId: "..." }

Transform DSL

Connectors produce raw events; the transform DSL (defined in @kaduno/core) maps source payloads to destination shape. The DSL is shared between the runtime executor and the AI mapping prompt.

Key operations: map, rename, cast, default, template, lookup, flatten, filter.

Use pullpush.mapping.suggest to AI-generate a mapping from sample data, or pullpush.flow.dryRun to test a mapping against real events without delivery.

Dynamic-connector DSL (canonical)

The DynamicConnector interprets a per-entity JSON definition with: canonical_map (raw↔canonical field mapping + an extensions passthrough bag), body_template (non-REST envelopes, e.g. JSON-RPC), a declarative snapshot (paginated canonical scan), lookup (cross-API join: preload_all, on_miss skip|error|passthrough), pagination, auth, and rate_limiting (requests_per_second). Build/extend definitions with pullpush.connector.compose (canonical-aware) or pullpush.connector.define, then validate/activate. Prefer a DSL definition; drop to a custom TS connector only when logic is irreducible.


Publishing to the marketplace

Once a connector definition is active and working, you can share it with other tenants:

Tool: pullpush.marketplace.publish
Input:
  definitionId: "clxyz..."
  category: "ecommerce"         # crm, ecommerce, accounting, messaging, logistics, hr, custom
  tags: ["payments", "stripe"]
  description: "Stripe payment integration with order and refund sync"

This creates a marketplace listing that other tenants can browse and install. The definition is cloned into the installing tenant's scope — they get their own versioned copy.

Other tenants install via pullpush.marketplace.install and can update to newer published versions via pullpush.marketplace.update.


Best practices

  1. Always implement test() — it's used by connection.test and the health-check job
  2. Return retryable vs non-retryable errors — the pipeline uses this to decide retry vs dead-letter
  3. Use incremental cursors — don't re-fetch all data on every poll
  4. Handle rate limits — return { retryable: true } on 429; BullMQ handles backoff
  5. Normalize event types — use dot notation (order.created, product.updated)
  6. Keep config in the connection — never hardcode credentials in connector code
  7. Prefer pullpush.connector.compose — canonical-aware DSL generation; drop to code only when logic is irreducible
  8. Publish to marketplace — if your connector is reusable, publish it so other tenants benefit