internal/datasource/README.md
The data source sync framework enables WeKnora to automatically import and synchronize content from external platforms (Feishu, Notion, Confluence, etc.) into knowledge bases. This is the foundational layer upon which all specific connectors are built.
┌─────────────────────────────────────────────────────────────┐
│ External Data Sources │
│ (Feishu, Notion, Confluence, GitHub, Google Drive...) │
└─────────────┬───────────────────────────────────────────────┘
│
▼
┌─────────────────────────────────────────────────────────────┐
│ Connector Registry & Adapters │
│ Each platform (feishu/, notion/, confluence/, etc) │
│ implements the Connector interface │
└─────────────┬───────────────────────────────────────────────┘
│
▼
┌─────────────────────────────────────────────────────────────┐
│ DataSourceService (Business Logic) │
│ ├─ CreateDataSource / UpdateDataSource │
│ ├─ ManualSync / ListAvailableResources │
│ ├─ ValidateConnection / PauseDataSource │
│ └─ ProcessSync (asynq task handler) │
└─────────────┬───────────────────────────────────────────────┘
│
▼
┌─────────────────────────────────────────────────────────────┐
│ HTTP Handler & API Routes (/api/v1/datasource) │
│ REST endpoints for UI/programmatic access │
└─────────────┬───────────────────────────────────────────────┘
│
▼
┌─────────────────────────────────────────────────────────────┐
│ Database (MySQL / PostgreSQL) │
│ ├─ data_sources (configuration & state) │
│ └─ sync_logs (history & metadata) │
└─────────────────────────────────────────────────────────────┘
Manual Trigger or Scheduled Job (cron)
↓
Scheduler enqueues asynq Task
↓
ProcessSync (task handler) executes
↓
Connector.Fetch* methods pull data
↓
Diff engine identifies changes (new/update/delete)
↓
KnowledgeService creates/updates Knowledge entries
↓
Documents parsed → chunks → vectors → indexed
internal/datasource/
├── connector.go # Connector interface & registry
├── errors.go # Error definitions
└── README.md # This file
internal/types/
├── datasource.go # Data models (DataSource, SyncLog, etc)
└── interfaces/datasource.go # Service interfaces
internal/application/
├── repository/datasource_repo.go # Database access layer
└── service/datasource_service.go # Business logic
internal/handler/
└── datasource.go # HTTP request handlers
internal/router/
├── router.go # Route registration
├── task.go # Asynq task registration
└── sync_task.go # Lite mode task registration
migrations/versioned/
└── 000028_datasource_tables.up/down.sql # Database schema
Represents a configured external data source:
Tracks execution of each sync operation:
All external data sources implement this interface:
type Connector interface {
Type() string
Validate(ctx, config) error
ListResources(ctx, config) ([]Resource, error)
FetchAll(ctx, config, resourceIDs) ([]FetchedItem, error)
FetchIncremental(ctx, config, cursor) ([]FetchedItem, *SyncCursor, error)
}
POST /api/v1/datasource # Create
GET /api/v1/datasource # List (by kb_id)
GET /api/v1/datasource/:id # Get
PUT /api/v1/datasource/:id # Update
DELETE /api/v1/datasource/:id # Delete
POST /api/v1/datasource/:id/validate # Test connection
GET /api/v1/datasource/:id/resources # List available resources
POST /api/v1/datasource/:id/sync # Trigger manual sync
POST /api/v1/datasource/:id/pause # Pause
POST /api/v1/datasource/:id/resume # Resume
GET /api/v1/datasource/:id/logs # List sync logs
GET /api/v1/datasource/logs/:log_id # Get specific log
GET /api/v1/datasource/types # Available connectors
Create the connector package in internal/datasource/connector/<type>/
// connector/<type>/client.go - API client
// connector/<type>/connector.go - Implements Connector interface
// connector/<type>/types.go - Type-specific models
Implement the Connector interface
type YourConnector struct {
// fields
}
func (c *YourConnector) Type() string {
return types.ConnectorTypeYourType
}
func (c *YourConnector) Validate(ctx, config) error {
// Test connectivity and credentials
}
func (c *YourConnector) ListResources(ctx, config) ([]types.Resource, error) {
// List available resources (documents, spaces, etc)
}
func (c *YourConnector) FetchAll(ctx, config, resourceIDs) ([]types.FetchedItem, error) {
// Full sync - fetch all items from specified resources
}
func (c *YourConnector) FetchIncremental(ctx, config, cursor) ([]types.FetchedItem, *types.SyncCursor, error) {
// Incremental sync - fetch only changed items since cursor
}
Register the connector in the container (in container.go)
container.Provide(func() datasource.Connector {
return yourconnector.NewConnector()
})
Add metadata in connector.go
types.ConnectorTypeYourType: {
Type: types.ConnectorTypeYourType,
Name: "Your Platform",
AuthType: "oauth2",
// ...
}
The framework is designed to be testable:
All operations return detailed error information: