docs/contributing/parser-migration-guide.md
Audience: engineers porting an existing record-mode parser (in
keploy, keploy/integrations, or keploy/enterprise) from the legacy
net.Conn-based path to the V2 FakeConn + supervisor architecture.
The V2 architecture (see pkg/agent/proxy/README.md and PLAN.md) gives
every migrated parser three guarantees for free:
globalPassThrough.Migration is additive and opt-in. The legacy code path stays
intact; the parser routes between old and new based on whether the
dispatcher populated RecordSession.V2.
Every parser migration follows the same shape. Apply it mechanically unless the parser has mid-stream TLS (Postgres v2, MySQL) — those need the directive-channel extension documented below.
In your parser's type declaration file (often myproto.go):
// IsV2 opts this parser into the V2 FakeConn-based record path.
// The dispatcher in handleConnection performs a type assertion
// against integrations.IntegrationsV2; when IsV2 returns true,
// RecordOutgoing is invoked inside supervisor.Run.
func (p *MyProto) IsV2() bool { return true }
The marker is per-type. Return true only once the parser's
recordV2 is implemented and tested.
RecordOutgoing into a dispatcherRename the current body to recordLegacy and put a tiny router
in the interface-method slot:
func (p *MyProto) RecordOutgoing(ctx context.Context, s *integrations.RecordSession) error {
if s.V2 != nil {
return p.recordV2(ctx, s.V2)
}
return p.recordLegacy(ctx, s)
}
// recordLegacy is the exact body that was previously RecordOutgoing.
// Do NOT refactor it during migration. Keep the legacy path bit-for-bit
// stable so parity tests can compare V2 against it.
func (p *MyProto) recordLegacy(ctx context.Context, s *integrations.RecordSession) error {
// ... existing code, unchanged ...
}
recordV2The V2 session (*supervisor.Session) exposes:
| Field / method | Purpose |
|---|---|
sess.ClientStream *fakeconn.FakeConn | Bytes the real client sent. Read via Read or ReadChunk. |
sess.DestStream *fakeconn.FakeConn | Bytes the real destination sent. |
sess.Directives chan<- directive.Directive | Send control messages (TLS upgrade, abort, finalize). |
sess.Acks <-chan directive.Ack | Receive acks for directives. |
sess.Mocks chan<- *models.Mock | Low-level mock channel (prefer EmitMock). |
sess.EmitMock(m) error | Emit a mock (runs hook chain, respects incomplete-mock gate). |
sess.MarkMockIncomplete(reason) | Drop the in-flight mock. |
sess.MarkMockComplete() | Clear the incomplete flag. |
sess.IsMockIncomplete() bool | Query before expensive work. |
sess.AddPostRecordHook(h) | Front-of-chain wrapper hook. |
sess.Logger *zap.Logger | Pre-scoped with connection fields. |
sess.Ctx context.Context | Supervisor-managed lifetime. Respect it. |
sess.Opts models.OutgoingOptions | Config (bypass rules, passwords, TLS configs, noise). |
Use ReadChunk for timestamps, Read for byte streams:
chunk, err := sess.ClientStream.ReadChunk()
if err != nil {
if errors.Is(err, io.EOF) || errors.Is(err, fakeconn.ErrClosed) {
// Normal close or supervisor abort — return cleanly.
return nil
}
return err
}
// chunk.ReadAt is the canonical request-first-byte timestamp.
For byte-stream-oriented protocols (HTTP/1), you can pass the FakeConn
to a bufio.Reader — it satisfies net.Conn. Caveat:
bufio.Reader.ReadBytes can over-consume past your framing boundary
(e.g. into the next request in a pipeline). If that matters, use
ReadChunk directly and do your own buffering.
mock := &models.Mock{
Kind: models.MyProto,
Spec: models.MockSpec{
ReqTimestampMock: firstClientChunk.ReadAt,
ResTimestampMock: lastDestChunk.WrittenAt,
MyProtoReq: parsedReq,
MyProtoResp: parsedResp,
Metadata: map[string]string{"type": "myproto"},
},
ConnectionID: sess.ClientConnID,
}
if err := sess.EmitMock(mock); err != nil {
return err
}
See docs/reference/mock-matcher-contract.md for the exact shape
the replay matcher consumes. Mock output must be byte-equivalent
to the legacy path for the same input — write a parity test.
Never call time.Now() for ReqTimestampMock / ResTimestampMock.
Always source from chunk.ReadAt / chunk.WrittenAt. Log-line and
telemetry use of time.Now() is fine — prefix the call site with
// allow:time.Now to suppress the lint.
Enforced by tools/lint/no_timestamp_in_parser/. Run locally:
go run ./tools/lint/no_timestamp_in_parser/cmd/no_timestamp_in_parser ./...
Postgres's SSLRequest and MySQL's CLIENT_SSL are the two known
cases. SMTP's STARTTLS would be another if we add support.
Pattern:
// 1. Read the pre-TLS prelude bytes from the FakeConn and emit a
// config mock tagged with Metadata["tls_stage"] = "prelude".
// ...
// 2. Request the upgrade. destCfg is a *tls.Config; keploy acts as
// TLS client to the real destination. clientCfg is a *tls.Config;
// keploy acts as TLS server presenting the MITM cert to the real
// client. The relay's injected TLSUpgradeFn owns the MITM cert
// chain; clientCfg can be a minimal non-nil value to signal
// "yes, upgrade the client side".
sess.Directives <- directive.UpgradeTLS(destCfg, clientCfg, "myproto tls_start")
// 3. Wait for the ack.
var ack directive.Ack
select {
case ack = <-sess.Acks:
case <-sess.Ctx.Done():
return sess.Ctx.Err()
}
if !ack.OK {
sess.MarkMockIncomplete("tls upgrade failed")
return fmt.Errorf("tls upgrade: %w", ack.Err)
}
// 4. From this point, subsequent chunks on ClientStream and
// DestStream are post-TLS plaintext. The relay did the handshake
// on the real sockets; you just keep reading.
For the TLS config builders, use the system trust store by
default (i.e. do not set InsecureSkipVerify: true). CodeQL flags
InsecureSkipVerify, and the supervisor's fallback-to-passthrough
handles the case where the upstream cert doesn't verify — traffic
still flows, the mock is dropped. See
pkg/agent/proxy/integrations/mysql/recorder/record_v2.go:buildDestTLSConfigV2
for the reference pattern.
ClientStream.ReadChunk returns io.EOF or
ErrClosed. See pkg/agent/proxy/integrations/http/recordv2.go.net.Conn or io.Reader — they work on FakeConn
unchanged. See pkg/agent/proxy/integrations/mysql/recorder/record_v2.go.Add tests in the same directory as your parser, with a v2_test.go
suffix or in a separate file:
// TestMyProto_RecordV2_HappyPath — drive canned bytes through a
// fakeconn pair; assert mock shape and chunk-derived timestamps.
// TestMyProto_RecordV2_Parity — feed identical bytes into legacy
// and V2; assert mocks are equivalent on non-timestamp fields.
// TestMyProto_RecordV2_TLSUpgrade — if your protocol has one.
// TestMyProto_IsV2 — guards the capability marker.
Use fakeconn.New(ch, nil, nil) to construct FakeConns directly and
push Chunk values on the channel with known ReadAt/WrittenAt
values (e.g. time.Unix(1000, 0)) to prove you're not calling
time.Now().
For mid-stream TLS, plug a stub directive processor: drain
sess.Directives, push back an Ack{OK:true} (or OK:false for the
failure test) on sess.Acks.
From the repo root:
go build ./...
go vet ./...
gofmt -l <your parser dir>
go test -race -count=1 -timeout 60s ./<your parser dir>/...
go run ./tools/lint/no_timestamp_in_parser/cmd/no_timestamp_in_parser ./<your parser dir>/...
All clean, all green. Commit with DCO sign-off:
git commit -s -m "feat(<proto>): native-migrate record path to V2 FakeConn architecture"
Three parsers are migrated on PR #4113 — use them as templates:
pkg/agent/proxy/integrations/http/recordv2.go. No TLS
directive; simplest shape. Parity test against parseFinalHTTP.pkg/agent/proxy/integrations/mysql/recorder/record_v2.go.
Mid-stream TLS via directive.UpgradeTLS. Full handshake + auth +
query state machine.pkg/agent/proxy/integrations/generic/encode_v2.go.
Concurrent reader goroutines pair request/response.session.Ingress.Write(...). In V2 mode, Ingress
and Egress are nil — your reference to them nil-derefs. The relay
is the only writer. If your legacy code writes a response to the
client, you don't need that in V2 — the relay already forwarded it.SessionOnAbort. Your parser just reads until EOF or
ErrClosed and returns.sess.Ctx. They'll leak.
Either use sess.Ctx directly, or register via
supervisor.RegisterGoroutine() (available when direct supervisor
access is wired — today use sess.Ctx).time.Now() indiscriminately. The lint rule's
// allow:time.Now exemption is for log lines and metrics.
ReqTimestampMock / ResTimestampMock must come from chunks.memoryguard.IsRecordingPaused
calls from the legacy path. The relay handles it for V2 at the
tee; legacy parsers still need their checks because they're not
behind the relay. Redundancy disappears naturally as parsers
migrate.IsV2() bool { return true } last, after recordV2 is
landed and tested. A parser returning true without a working
recordV2 will crash in the dispatcher.KEPLOY_NEW_RELAY=off forces every parser (including yours) back
to the legacy path. Use during incidents.KEPLOY_DISABLE_PARSING=1 disables parser dispatch entirely; every
connection goes to raw passthrough.pkg/agent/proxy/README.md — architecture overview.docs/reference/mock-matcher-contract.md — what the replay matcher
actually reads.PLAN.md (repo root) — the multi-phase design.Ask in #keploy-record-v2 (or equivalent) before breaking new ground;
the core V2 maintainers will usually have seen the corner case before.