docs/integrations/ai-engines/ray-serve.mdx
Ray Serve is a simple high-throughput model serving library that can wrap around your ML model.
In this example, we train an external scikit-learn model to use for making predictions.
Let's look at an actual model wrapped by a class that complies with the requirements.
import ray
from fastapi import Request, FastAPI
from ray import serve
import time
import pandas as pd
import json
app = FastAPI()
async def parse_req(request: Request):
data = await request.json()
target = data.get('target', None)
di = json.loads(data['df'])
df = pd.DataFrame(di)
return df, target
@serve.deployment(route_prefix="/my_model")
@serve.ingress(app)
class MyModel:
@app.post("/train")
async def train(self, request: Request):
df, target = await parse_req(request)
feature_cols = list(set(list(df.columns)) - set([target]))
self.feature_cols = feature_cols
X = df.loc[:, self.feature_cols]
Y = list(df[target])
self.model = LogisticRegression()
self.model.fit(X, Y)
return {'status': 'ok'}
@app.post("/predict")
async def predict(self, request: Request):
df, _ = await parse_req(request)
X = df.loc[:, self.feature_cols]
predictions = self.model.predict(X)
index = list(range(len(predictions)))
pred_dict = {'prediction': [float(x) for x in predictions], 'index': index}
return pred_dict
my_app = MyModel.bind()
After saving the above code into rayserve.py, run it using serve run rayserve:my_app.
It is important to have the /train and /predict endpoints.
The /train endpoint accepts two parameters to be sent via POST:
df is a serialized dictionary that can be converted into a pandas dataframe.target is the name of the target column to be predicted.It returns a JSON object containing the status key and the ok value.
The /predict endpoint requires one parameter to be sent via POST:
df is a serialized dictionary that can be converted into a pandas dataframe.It returns a dictionary containing the prediction and index keys. It stores the
predictions. Additional keys can be returned for confidence and confidence
intervals.
Once you start the RayServe-wrapped model, you can create and train it in MindsDB.
CREATE MODEL mindsdb.byom_ray_serve
FROM mydb (
SELECT number_of_rooms, initial_price, rental_price
FROM test_data.home_rentals
)
PREDICT number_of_rooms
USING
url.train = 'http://127.0.0.1:8000/my_model/train',
url.predict = 'http://127.0.0.1:8000/my_model/predict',
dtype_dict={"number_of_rooms": "categorical", "initial_price": "integer", "rental_price": "integer"},
format='ray_server';
Now, you can fetch predictions using the standard MindsDB syntax. Follow the
guide on the SELECT statement to learn more.
You can directly pass input data in the WHERE clause to get a single
prediction.
SELECT *
FROM byom_ray_serve
WHERE initial_price=3000
AND rental_price=3000;
Or you can JOIN the model wth a data table to get bulk predictions.
SELECT tb.number_of_rooms, t.rental_price, tb.index
FROM mydb.test_data.home_rentals AS t
JOIN mindsdb.byom_ray_serve AS tb
WHERE t.rental_price > 5300;
Please note that if your model is behind a reverse proxy like nginx, you might have to increase the maximum limit for POST requests in order to receive the training data. MindsDB can send as much as you'd like - it has been stress-tested with over a billion rows.
</Tip>Here, we consider a natural language processing (NLP) task where we want to train a neural network using Keras to detect if a tweet is related to a natural disaster, such as fires, earthquakes, etc. Please download this dataset to follow the example.
We create a Ray Serve service that wraps around the Kaggle NLP Model that can be trained and used for making predictions.
import re
import time
import json
import string
import requests
from collections import Counter, defaultdict
import ray
from ray import serve
import gensim
import numpy as np
import pandas as pd
from tqdm import tqdm
from nltk.util import ngrams
from nltk.corpus import stopwords
from nltk.tokenize import word_tokenize
from fastapi import Request, FastAPI
from sklearn.model_selection import train_test_split
from sklearn.feature_extraction.text import CountVectorizer
from tensorflow.keras.preprocessing.text import Tokenizer
from tensorflow.keras.preprocessing.sequence import pad_sequences
from tensorflow.keras.models import Sequential
from tensorflow.keras.layers import Embedding, LSTM, Dense, SpatialDropout1D
from tensorflow.keras.initializers import Constant
from tensorflow.keras.optimizers import Adam
app = FastAPI()
stop = set(stopwords.words('english'))
async def parse_req(request: Request):
data = await request.json()
target = data.get('target', None)
di = json.loads(data['df'])
df = pd.DataFrame(di)
return df, target
@serve.deployment(route_prefix="/nlp_kaggle_model")
@serve.ingress(app)
class Model:
MAX_LEN = 100
GLOVE_DIM = 50
EPOCHS = 10
def __init__(self):
self.model = None
@app.post("/train")
async def train(self, request: Request):
df, target = await parse_req(request)
target_arr = df.pop(target).values
df = self.preprocess_df(df)
train_corpus = self.create_corpus(df)
self.embedding_dict = {}
with open('./glove.6B.50d.txt', 'r') as f:
for line in f:
values = line.split()
word = values[0]
vectors = np.asarray(values[1:], 'float32')
self.embedding_dict[word] = vectors
f.close()
self.tokenizer_obj = Tokenizer()
self.tokenizer_obj.fit_on_texts(train_corpus)
sequences = self.tokenizer_obj.texts_to_sequences(train_corpus)
tweet_pad = pad_sequences(sequences, maxlen=self.__class__.MAX_LEN, truncating='post', padding='post')
df = tweet_pad[:df.shape[0]]
word_index = self.tokenizer_obj.word_index
num_words = len(word_index) + 1
embedding_matrix = np.zeros((num_words, self.__class__.GLOVE_DIM))
for word, i in tqdm(word_index.items()):
if i > num_words:
continue
emb_vec = self.embedding_dict.get(word)
if emb_vec is not None:
embedding_matrix[i] = emb_vec
self.model = Sequential()
embedding = Embedding(num_words,
self.__class__.GLOVE_DIM,
embeddings_initializer=Constant(embedding_matrix),
input_length=self.__class__.MAX_LEN,
trainable=False)
self.model.add(embedding)
self.model.add(SpatialDropout1D(0.2))
self.model.add(LSTM(64, dropout=0.2, recurrent_dropout=0.2))
self.model.add(Dense(1, activation='sigmoid'))
optimizer = Adam(learning_rate=1e-5)
self.model.compile(loss='binary_crossentropy', optimizer=optimizer, metrics=['accuracy'])
X_train, X_test, y_train, y_test = train_test_split(df, target_arr, test_size=0.15)
self.model.fit(X_train, y_train, batch_size=4, epochs=self.__class__.EPOCHS, validation_data=(X_test, y_test), verbose=2)
return {'status': 'ok'}
@app.post("/predict")
async def predict(self, request: Request):
df, _ = await parse_req(request)
df = self.preprocess_df(df)
test_corpus = self.create_corpus(df)
sequences = self.tokenizer_obj.texts_to_sequences(test_corpus)
tweet_pad = pad_sequences(sequences, maxlen=self.__class__.MAX_LEN, truncating='post', padding='post')
df = tweet_pad[:df.shape[0]]
y_pre = self.model.predict(df)
y_pre = np.round(y_pre).astype(int).flatten().tolist()
sub = pd.DataFrame({'target': y_pre})
pred_dict = {'prediction': [float(x) for x in sub['target'].values]}
return pred_dict
def preprocess_df(self, df):
df = df[['text']]
df['text'] = df['text'].apply(lambda x: self.remove_URL(x))
df['text'] = df['text'].apply(lambda x: self.remove_html(x))
df['text'] = df['text'].apply(lambda x: self.remove_emoji(x))
df['text'] = df['text'].apply(lambda x: self.remove_punct(x))
return df
def remove_URL(self, text):
url = re.compile(r'https?://\S+|www\.\S+')
return url.sub(r'', text)
def remove_html(self, text):
html = re.compile(r'<.*?>')
return html.sub(r'', text)
def remove_punct(self, text):
table = str.maketrans('', '', string.punctuation)
return text.translate(table)
def remove_emoji(self, text):
emoji_pattern = re.compile("["
u"\U0001F600-\U0001F64F" # emoticons
u"\U0001F300-\U0001F5FF" # symbols & pictographs
u"\U0001F680-\U0001F6FF" # transport & map symbols
u"\U0001F1E0-\U0001F1FF" # flags (iOS)
u"\U00002702-\U000027B0"
u"\U000024C2-\U0001F251"
"]+", flags=re.UNICODE)
return emoji_pattern.sub(r'', text)
def create_corpus(self, df):
corpus = []
for tweet in tqdm(df['text']):
words = [word.lower() for word in word_tokenize(tweet) if ((word.isalpha() == 1) & (word not in stop))]
corpus.append(words)
return corpus
if __name__ == '__main__':
ray.init()
serve.start(detached=True)
Model.deploy()
while True:
time.sleep(1)
Now, we need access to the training data. For that, we create a table called
nlp_kaggle_train to load the
dataset that the original model
uses. The nlp_kaggle_train table contains the following columns:
id INT,
keyword VARCHAR(255),
location VARCHAR(255),
text VARCHAR(5000),
target INT
Please note that the specifics of the schema/table and how to ingest the CSV data vary depending on your database.
Now, we can create and train this custom model in MindsDB.
CREATE MODEL mindsdb.byom_ray_serve_nlp
FROM maria (
SELECT text, target
FROM test.nlp_kaggle_train
) PREDICT target
USING
url.train = 'http://127.0.0.1:8000/nlp_kaggle_model/train',
url.predict = 'http://127.0.0.1:8000/nlp_kaggle_model/predict',
dtype_dict={"text": "rich_text", "target": "integer"},
format='ray_server';
The training process takes some time, considering that this model is a neural network rather than a simple logistic regression.
You can check the model status using this query:
DESCRIBE byom_ray_serve_nlp;
Once the status of the predictor has a value of trained, you can fetch
predictions using the standard MindsDB syntax. Follow the guide on the
SELECT statement to learn more.
SELECT *
FROM mindsdb.byom_ray_serve_nlp
WHERE text='The tsunami is coming, seek high ground';
The expected output of the query above is 1.
SELECT *
FROM mindsdb.byom_ray_serve_nlp
WHERE text='This is lovely dear friend';
The expected output of the query above is 0.
If your results do not match this example, try training the model for a longer amount of epochs.
</Tip> <Tip> **Get More Insights**Check out the article on How to bring your own machine learning model to databases by Patricio Cerda Mardini to learn more. </Tip>