gems/gitlab-active-context/doc/usage.md
Migrations are similar to database migrations: they create collections, update schemas, run backfills, etc.
See migrations for more details.
A migration worker applies migrations for the active connection. See Migrations.
If you want to run the worker manually, execute:
Ai::ActiveContext::MigrationWorker.new.perform
Queues keep track of items needing to be processed in bulk asynchronously. A queue definition has a unique key which registers queues based on the number of shards defined. Each shard creates a queue.
To create a new queue:
Add a file, extend ActiveContext::Concerns::Queue and define number_of_shards. You can also override the shard_limit method.
# frozen_string_literal: true
module Ai
module Context
module Queues
class MergeRequest
class << self
include ActiveContext::Concerns::Queue
def number_of_shards
2
end
def shard_limit
500
end
end
end
end
end
end
Register the queue class by adding it to config.queue_classes in config/initializers/active_context.rb.
ActiveContext.configure do |config|
config.queue_classes = [::Ai::Context::Queues::MergeRequest]
end
To access the unique queues:
ActiveContext.queues
=> #<Set: {"ai_context_queues:{merge_request}"}>
To view sharded queues:
ActiveContext.raw_queues
=> [#<Ai::Context::Queues::MergeRequest:0x0000000177cdf460 @shard=0>,
#<Ai::Context::Queues::MergeRequest:0x0000000177cdf370 @shard=1>]
Create a class under lib/active_context/references/ and inherit from the Reference class and define the following methods:
Class methods required:
serialize_data: defines a string representation of the reference objectInstance methods required:
init: reads from serialized_argsas_indexed_json or as_indexed_jsons: a hash or array of hashes containing the data representation of the objectoperation: determines the operation which can be one of upsert, update or delete. See operation types for more details.identifier: unique identifierOptional methods:
unique_identifiers: array of identifiers to build a unique identifier for every document. For example, [identifier, branch_name]. Defaults to [identifier]Existing preprocessors are:
Preload: preloads from the database to prevent N+1 queriesContentFetcher: fetches content from existing documents in the vector storeEmbeddings: generates embeddings for every document in bulkThese preprocessors rely on the document with content already stored in the vector store. If you need ActiveContext to handle the initial storage of documents in the vector store, you'll need to add a new preprocessor for that.
Requires model_klass and model_klass to define preload_indexing_data.
add_preprocessor :preload do |refs|
preload(refs)
end
Fetches content from existing documents in the vector store using a query.
add_preprocessor :get_content do |refs|
identifiers = refs.map(&:identifier)
query = ActiveContext::Query.filter(id: identifiers).limit(identifiers.count)
fetch_content(refs: refs, query: query, collection: Collections::Code)
end
#### Embeddings
Generates embeddings either by specifying a content method or by specifying a content field on existing documents.
When documents with a populated content field already exists:
```ruby
add_preprocessor :embeddings do |refs|
apply_embeddings(refs: refs, content_field: :content)
end
When the ref doesn't have existing documents:
add_preprocessor :embeddings do |refs|
apply_embeddings(refs: refs, content_method: :title_and_description)
end
def title_and_description
"Title: #{database_record.title}\n\nDescription: #{database_record.description}"
end
See how to set initial embedding model and how to migrate from one embedding model to another.
upsertCreates or updates documents, handling cases where a single reference has less documents than before by performing a delete cleanup operation.
The document content can be full or partial JSON.
updateUpdates documents that already exist.
The document content can be full or partial JSON.
deleteDeletes all documents belonging to a reference.
Example for a reference reading from a database relation, with preloading and bulk embedding generation:
# frozen_string_literal: true
module Ai
module Context
module References
class MergeRequest < ::ActiveContext::Reference
add_preprocessor :preload do |refs|
preload(refs)
end
add_preprocessor :embeddings do |refs|
apply_embeddings(refs: refs, target_field: :embeddings, content_method: :title_and_description)
end
def self.model_klass
::MergeRequest
end
def self.serialize_data(merge_request)
{ identifier: merge_request.id }
end
attr_accessor :identifier, :embedding
attr_writer :database_record
def init
@identifier, _ = serialized_args
end
def serialized_attributes
[identifier]
end
def title_and_description
"Title: #{database_record.title}\n\nDescription: #{database_record.description}"
end
def shared_attributes
{
iid: database_record.iid,
namespace_id: database_record.project.id,
traversal_ids: database_record.project.elastic_namespace_ancestry
}
end
def model_klass
self.class.model_klass
end
def database_record
@database_record ||= model_klass.find_by_id(identifier)
end
def operation
database_record ? :upsert : :delete
end
end
end
end
end
Example for code embeddings:
# frozen_string_literal: true
module Ai
module Context
module References
class CodeEmbeddings < ::ActiveContext::Reference
add_preprocessor :embeddings do |refs|
apply_embeddings(refs: refs, content_method: :blob_content)
end
attr_accessor :project_id, :identifier, :repository, :blob
def init
@project_id, @identifier = serialized_args
@repository = Project.find(project_id).repository
@blob = Gitlab::Git::Blob.raw(repository, identifier)
end
def serialized_attributes
[project_id, identifier]
end
def blob_content
blob.data
end
def operation
blob.data ? :upsert : :delete
end
def shared_attributes
{
project_id: project_id
}
end
end
end
end
end
A collection maps data to references and specifies a queue to track its references.
To add a new collection:
includes ActiveContext::Concerns::Collectionself.queue class method to return the associated queueself.reference_klass or self.reference_klasses class method to return the references for an objectself.routing(object) class method to determine how an object should be routedself.ids_to_objects(ids) class method to convert ids into objects for redaction.Example:
# frozen_string_literal: true
module Ai
module Context
module Collections
class MergeRequest
include ActiveContext::Concerns::Collection
def self.collection_name
'gitlab_active_context_merge_requests'
end
def self.queue
Queues::MergeRequest
end
def self.reference_klass
References::MergeRequest
end
def self.routing(object)
object.project.root_ancestor.id
end
def self.ids_to_objects(ids)
::MergeRequest.id_in(ids)
end
end
end
end
end
Adding references to the queue can be done a few ways:
The preferred method:
Ai::Context::Collections::MergeRequest.track!(MergeRequest.first)
Ai::Context::Collections::MergeRequest.track!(MergeRequest.take(10))
Passing a collection:
ActiveContext.track!(MergeRequest.first, collection: Ai::Context::Collections::MergeRequest)
Passing a collection and queue:
ActiveContext.track!(MergeRequest.first, collection: Ai::Context::Collections::MergeRequest, queue: Ai::Context::Queues::Default)
Building a reference:
ref = Ai::Context::References::CodeEmbeddings.new(collection_id: collection.id, routing: project.root_ancestor.id, project_id: project.id, identifier: blob.id)
Ai::Context::Collections::CodeEmbeddings.track!(ref)
ref = Ai::Context::References::CodeEmbeddings.new(collection_id: 24, routing: 24, project_id: 1, identifier: "9ab45314044d664a3b8ac1e05777411482bd0564")
Ai::Context::Collections::CodeEmbeddings.track!(ref)
Building a reference and passing a queue:
ref = Ai::Context::References::MergeRequest.new(collection_id: collection.id, routing: project.root_ancestor.id, identifier: 1)
ActiveContext.track!(ref, queue: Ai::Context::Queues::MergeRequest)
To view all tracked references:
ActiveContext::Queues.all_queued_items
Once references are tracked, they will be executed asyncronously. See Async Processing.
To execute all refs from all refs sync, run
ActiveContext.execute_all_queues!
To clear a queue:
Ai::Context::Queues::MergeRequest.clear_tracking!
The track! method adds documents to the vector stores and can be called from anywhere: a service, a callback, event, etc.
The ::ActiveContext::Concerns::Syncable concern can be added to ActiveRecord models to update a collection on callbacks.
For example, we can add the concern to the MergeRequest model to track merge request refs on create, update and destroy:
include ::ActiveContext::Concerns::Syncable
sync_with_active_context on: :create, using: ->(record) { record.track_merge_request! }
sync_with_active_context on: :update, condition: -> { (saved_change_to_title? || saved_change_to_description?) }, using: ->(record) { record.track_merge_request! }
sync_with_active_context on: :destroy, using: ->(record) { record.track_merge_request! }
def track_merge_request!
Ai::Context::Collections::MergeRequest.track!(self)
end
def syncable?
# some condition to determine whether to track an MR record
end
We can also keep merge requests up to date if an associated record is updated using the same approach. Say a merge request document contains project.visibility_level, we can add the following to the projects model to update its associated merge requests:
include ::ActiveContext::Concerns::Syncable
sync_with_active_context on: :update,
condition: -> { saved_change_to_visibility_level? },
using: ->(project) { Ai::Context::Collections::MergeRequest.track!(project.merge_requests) }
def syncable?
# some condition to determine whether or not the project is being indexed
end
query = ActiveContext::Query.filter(project_id: 1).limit(1)
results = Ai::Context::Collections::MergeRequest.search(user: current_user, query: query)
results.to_a
target_embedding = Ai::Context::Collections::MergeRequest.search_embedding_model.generate_embeddings("some text")
query = ActiveContext::Query.filter(project_id: 1).knn(target: "embeddings", vector: target_embedding, k: 1)
results = Ai::Context::Collections::MergeRequest.search(user: current_user, query: query)
results.to_a