Connector Authoring Guide
How to build connectors for Kaduno Pullpush — AI-generated, manually coded, or JSON-defined.
Connector types
| Type | Where it lives | How it's created |
|---|---|---|
| Builtin (code) | packages/connectors/src/ |
Manually coded TypeScript |
| AI-generated (code) | packages/connectors/src/generated/ |
pullpush.connector.generate MCP tool |
| Dynamic (JSON definition) | Database (ConnectorDefinition table) |
pullpush.connector.define MCP tool or Admin UI |
| MCP protocol | Database (protocol: mcp) |
JSON definition that delegates to an external MCP server |
Canonical-aware. Connectors are built around the canonical data model. Source connectors emit canonical events (
NormalizedEventcarriescanonicalType/canonicalVersion); destinations consume canonical payloads indeliver(); and connectors should implement the optionalsnapshot(connection, opts): AsyncIterable<SnapshotItem>that yields current state in canonical space ({ key, canonicalType, fields }) — this is what powers sync-preview and the reconcile→deliver loop. A snapshot must distinguish a real0from "no data" (never emit0for a missing scope, or you will wipe destination stock).Credentials are envelope-encrypted.
connection.configis empty at rest; callers hydrate viaawait hydrateConnectionConfig(prisma, connection)before invoking a connector. Inside a connector, readconnection.config(already hydrated by the caller) — never decrypt or log raw credentials yourself.
Builtin connectors (manual TypeScript)
Source connector interface
import type { SourceConnector, RawChange } from '@kaduno/connectors';
export class MySourceConnector implements SourceConnector {
type = 'my-source';
async *poll(connection, cursor): AsyncIterable<RawChange> {
// Yield raw changes since `cursor` (ISO timestamp or page token)
yield { type: 'item.updated', externalId: '123', payload: { ... }, cursor: '...' };
}
supportsWebhook(): boolean {
return true; // or false
}
verifyWebhook(rawBody: string, signature: string, config: Record<string, unknown>): boolean {
// HMAC verification
}
parseWebhook(rawBody: string, headers: Record<string, string>): RawChange[] {
// Parse incoming webhook payload into raw changes
}
async test(connection): Promise<boolean> {
// Verify credentials work (e.g. GET /api/status)
}
}
Destination connector interface
import type { DestinationConnector, DeliveryResult } from '@kaduno/connectors';
export class MyDestinationConnector implements DestinationConnector {
type = 'my-destination';
async deliver(connection, payload, event): Promise<DeliveryResult> {
// Send the transformed payload to the destination system
// Return { success: true, latencyMs } or { success: false, retryable: true, errorMessage: '...' }
}
async test(connection): Promise<boolean> {
// Verify credentials work
}
}
Built-in connectors are classes implementing
SourceConnector/DestinationConnector(seemagento/,linnworks/,abicart/). The destination signature isdeliver(connection, payload, event)and returns{ success, errorCode?, errorMessage?, latencyMs? }. Add the optionalsnapshot()to make the connector previewable/reconcilable.
snapshot(connection, opts?)contract. Yield canonicalSnapshotItems ({ key, canonicalType, fields }).opts.fieldHintis a scope hint (e.g. a target stock location).opts.onlyKeys(when present) restricts the snapshot to a specific set of canonical keys — emit at most those keys. A source that supports it should resolve channel-specific keys to the destination's keys (e.g. Linnworks resolves Abicart article numbers via master SKU first, then uses the preloadedchannelSkuCachefor instant channel-SKU lookups) so the diff lines up and a destination that carries a subset doesn't trigger a full-catalog fan-out. The pipeline derivesonlyKeysfrom the destination snapshot whenscopeToDestination/scopeSourceToDestinationis set (reconcile / preview).opts.channelSkuCacheis auto-loaded from theChannelSkuCachetable (built by thechannel-sku-cachebackground job every 12h) — connectors that support it get O(1) channel-SKU resolution instead of per-item API calls. Stock safety: if the target location has no record for an item, skip it — never emit0(that would push 0 and wipe real destination stock).
Registration
Register in packages/connectors/src/index.ts inside registerBuiltinConnectors() using the
factory form (a new instance per resolve):
import { registerSource, registerDestination } from './registry.js';
import { MySourceConnector } from './my-source/index.js';
import { MyDestinationConnector } from './my-destination/index.js';
registerSource('my-source', () => new MySourceConnector());
registerDestination('my-destination', () => new MyDestinationConnector());
bootstrapConnectors(prisma) calls registerBuiltinConnectors() + loads DB-backed dynamic
definitions; every app entrypoint uses it.
AI-generated connectors (code)
Use the MCP tool pullpush.connector.generate:
Tool: pullpush.connector.generate
Input:
name: "shopify"
connectorType: "source"
apiDescription: "Shopify Admin REST API for orders and products"
apiDocUrl: "https://shopify.dev/docs/api/admin-rest"
This runs a generate → typecheck → test → self-correct loop (up to 3 attempts):
- AI generates connector code from the API description/OpenAPI spec
- Code is written to
packages/connectors/src/generated/<name>.ts - TypeScript compiler validates the code
- If typecheck fails, AI self-corrects and retries
- On success, the connector is auto-registered in the registry
Generated connectors follow the same interface as builtins. You can later move them to the main source tree and customize.
Dynamic connectors (JSON definition)
For connectors that don't need custom code — just REST API calls with config.
Create via MCP
Tool: pullpush.connector.define
Input:
name: "My REST API"
apiDocUrl: "https://api.example.com/openapi.json"
apiDescription: "REST API with OAuth2, polls /orders endpoint"
This generates a JSON definition and stores it as a ConnectorDefinition record in draft status.
Definition lifecycle
draft → review → active → archived
- draft: Just generated, not yet usable
- review: Validated but awaiting approval
- active: Live and usable for connections (hot-registered on activation)
- archived: Retired, not loaded on startup
Activate with:
Tool: pullpush.connector.definition.activate
Input: { id: "<definition-id>" }
Definition JSON structure
{
"metadata": {
"slug": "my-api",
"type": "source",
"description": "My REST API connector",
"version": "1.0.0"
},
"protocol": "rest",
"auth": {
"type": "oauth2",
"tokenUrl": "https://api.example.com/oauth/token",
"scopes": ["read:orders"]
},
"endpoints": {
"poll": {
"method": "GET",
"url": "https://api.example.com/orders",
"params": { "since": "{{cursor}}", "limit": 100 },
"pagination": { "type": "cursor", "field": "next_cursor" }
},
"test": {
"method": "GET",
"url": "https://api.example.com/status"
}
},
"eventMapping": {
"externalId": "$.id",
"eventType": "order.updated",
"occurredAt": "$.updated_at",
"payload": "$"
}
}
MCP protocol connectors
For connectors that delegate to an external MCP server:
{
"metadata": { "slug": "ext-service", "type": "source" },
"protocol": "mcp",
"mcpConfig": {
"serverUrl": "https://ext-service.example.com/mcp",
"tools": {
"poll": "ext.orders.list",
"deliver": "ext.orders.create",
"test": "ext.health"
}
}
}
Testing connectors
Builtin/generated (vitest)
Create a test file alongside the connector:
// packages/connectors/src/__tests__/my-source.test.ts
import { describe, it, expect } from 'vitest';
import { mySource } from '../my-source.js';
describe('my-source', () => {
it('normalizes order payload', () => {
const raw = { /* sample API response */ };
const events = mySource.parseWebhook(JSON.stringify(raw), {});
expect(events).toHaveLength(1);
expect(events[0].eventType).toBe('order.created');
});
});
Use golden-file recorded fixtures for deterministic tests:
- Store sample API responses in
packages/connectors/src/__fixtures__/ - Assert normalized output matches expected snapshots
Dynamic definitions
Tool: pullpush.connector.definition.validate
Input: { definition: { ... } }
Tool: pullpush.connector.definition.test
Input: { definitionId: "...", connectionId: "..." }
Transform DSL
Connectors produce raw events; the transform DSL (defined in @kaduno/core) maps source payloads to destination shape. The DSL is shared between the runtime executor and the AI mapping prompt.
Key operations: map, rename, cast, default, template, lookup, flatten, filter.
Use pullpush.mapping.suggest to AI-generate a mapping from sample data, or pullpush.flow.dryRun to test a mapping against real events without delivery.
Dynamic-connector DSL (canonical)
The DynamicConnector interprets a per-entity JSON definition with: canonical_map (raw↔canonical
field mapping + an extensions passthrough bag), body_template (non-REST envelopes, e.g.
JSON-RPC), a declarative snapshot (paginated canonical scan), lookup (cross-API join:
preload_all, on_miss skip|error|passthrough), pagination, auth, and rate_limiting
(requests_per_second). Build/extend definitions with pullpush.connector.compose (canonical-aware)
or pullpush.connector.define, then validate/activate. Prefer a DSL definition; drop to a custom
TS connector only when logic is irreducible.
Publishing to the marketplace
Once a connector definition is active and working, you can share it with other tenants:
Tool: pullpush.marketplace.publish
Input:
definitionId: "clxyz..."
category: "ecommerce" # crm, ecommerce, accounting, messaging, logistics, hr, custom
tags: ["payments", "stripe"]
description: "Stripe payment integration with order and refund sync"
This creates a marketplace listing that other tenants can browse and install. The definition is cloned into the installing tenant's scope — they get their own versioned copy.
Other tenants install via pullpush.marketplace.install and can update to newer published
versions via pullpush.marketplace.update.
Best practices
- Always implement
test()— it's used byconnection.testand the health-check job - Return retryable vs non-retryable errors — the pipeline uses this to decide retry vs dead-letter
- Use incremental cursors — don't re-fetch all data on every poll
- Handle rate limits — return
{ retryable: true }on 429; BullMQ handles backoff - Normalize event types — use dot notation (
order.created,product.updated) - Keep config in the connection — never hardcode credentials in connector code
- Prefer
pullpush.connector.compose— canonical-aware DSL generation; drop to code only when logic is irreducible - Publish to marketplace — if your connector is reusable, publish it so other tenants benefit