notebooks/piper_inference_(ONNX).ipynb
<a href="https://colab.research.google.com/github/rmcpantoja/piper/blob/master/notebooks/piper_inference_(ONNX).ipynb" target="_parent"></a>
#@title Install software and settings
#@markdown The speech synthesizer and other important dependencies will be installed in this cell. But first, some settings:
#@markdown #### Enhable Enhanced Accessibility?
#@markdown This Enhanced Accessibility functionality is designed for the visually impaired, in which most of the interface can be used by voice guides.
enhanced_accessibility = True #@param {type:"boolean"}
#@markdown ---
#@markdown #### Please select your language:
lang_select = "English" #@param ["English", "Spanish"]
if lang_select == "English":
lang = "en"
elif lang_select == "Spanish":
lang = "es"
else:
raise Exception("Language not supported.")
#@markdown ---
#@markdown #### Do you want to use the GPU for inference?
#@markdown The GPU can be enabled in the edit/notebook settings menu, and this step must be done before connecting to a runtime. The GPU can lead to a higher response speed in inference, but you can use the CPU, for example, if your colab runtime to use GPU's has been ended.
use_gpu = True #@param {type:"boolean"}
if enhanced_accessibility:
from google.colab import output
guideurl = f"https://github.com/rmcpantoja/piper/blob/master/notebooks/wav/{lang}"
def playaudio(filename, extension = "wav"):
return output.eval_js(f'new Audio("{guideurl}/{filename}.{extension}?raw=true").play()')
%cd /content
print("Installing...")
if enhanced_accessibility:
playaudio("installing")
!git clone -q https://github.com/rmcpantoja/piper
%cd /content/piper/src/python
#!pip install -q -r requirements.txt
!pip install -q cython>=0.29.0 piper-phonemize==1.1.0 librosa>=0.9.2 numpy>=1.19.0 onnxruntime>=1.11.0 pytorch-lightning==1.7.0 torch==1.11.0
!pip install -q onnxruntime-gpu
!bash build_monotonic_align.sh
import os
if not os.path.exists("/content/piper/src/python/lng"):
!cp -r "/content/piper/notebooks/lng" /content/piper/src/python/lng
import sys
sys.path.append('/content/piper/notebooks')
from translator import *
lan = Translator()
print("Checking GPU...")
gpu_info = !nvidia-smi
if use_gpu and any('not found' in info for info in gpu_info[0].split(':')):
if enhanced_accessibility:
playaudio("nogpu")
raise Exception(lan.translate(lang, "The Use GPU checkbox is checked, but you don't have a GPU runtime."))
elif not use_gpu and not any('not found' in info for info in gpu_info[0].split(':')):
if enhanced_accessibility:
playaudio("gpuavailable")
raise Exception(lan.translate(lang, "The Use GPU checkbox is unchecked, however you are using a GPU runtime environment. We recommend you check the checkbox to use GPU to take advantage of it."))
if enhanced_accessibility:
playaudio("installed")
print("Success!")
#@title Download your exported model
%cd /content/piper/src/python
import os
#@markdown #### ID or link of the voice package (tar.gz format):
package_url_or_id = "" #@param {type:"string"}
#@markdown ---
if package_url_or_id == "" or package_url_or_id == "http" or package_url_or_id == "1":
if enhanced_accessibility:
playaudio("noid")
raise Exception(lan.translate(lang, "Invalid link or ID!"))
print("Downloading voice package...")
if enhanced_accessibility:
playaudio("downloading")
if package_url_or_id.startswith("1"):
!gdown -q "{package_url_or_id}" -O "voice_package.tar.gz"
elif package_url_or_id.startswith("https://drive.google.com/file/d/"):
!gdown -q "{package_url_or_id}" -O "voice_package.tar.gz" --fuzzy
else:
!wget -q "{package_url_or_id}" -O "voice_package.tar.gz"
if os.path.exists("/content/piper/src/python/voice_package.tar.gz"):
!tar -xf voice_package.tar.gz
print("Voice package downloaded!")
if enhanced_accessibility:
playaudio("downloaded")
else:
if enhanced_accessibility:
playaudio("dwnerror")
raise Exception(lan.translate(lang, "Model failed to download!"))
#@title run inference
#@markdown #### before you enjoy... Some notes!
#@markdown * You can run the cell to download voice packs and download voices you want at any time, even if you run this cell!
#@markdown * When you download a new voice, run this cell again and you will now be able to toggle between all the ones you download. Incredible, right?
#@markdown Enjoy!!
%cd /content/piper/src/python
# original: infer_onnx.py
import json
import logging
import math
import sys
from pathlib import Path
from enum import Enum
from typing import Iterable, List, Optional, Union
import numpy as np
import onnxruntime
from piper_train.vits.utils import audio_float_to_int16
import glob
import ipywidgets as widgets
from IPython.display import display, Audio, Markdown, clear_output
from piper_phonemize import phonemize_codepoints, phonemize_espeak, tashkeel_run
_LOGGER = logging.getLogger("piper_train.infer_onnx")
def detect_onnx_models(path):
onnx_models = glob.glob(path + '/*.onnx')
if len(onnx_models) > 1:
return onnx_models
elif len(onnx_models) == 1:
return onnx_models[0]
else:
return None
def main():
"""Main entry point"""
models_path = "/content/piper/src/python"
logging.basicConfig(level=logging.DEBUG)
providers = [
"CPUExecutionProvider"
if use_gpu is False
else ("CUDAExecutionProvider", {"cudnn_conv_algo_search": "DEFAULT"})
]
sess_options = onnxruntime.SessionOptions()
model = None
onnx_models = detect_onnx_models(models_path)
speaker_selection = widgets.Dropdown(
options=[],
description=f'{lan.translate(lang, "Select speaker")}:',
layout={'visibility': 'hidden'}
)
if onnx_models is None:
if enhanced_accessibility:
playaudio("novoices")
raise Exception(lan.translate(lang, "No downloaded voice packages!"))
elif isinstance(onnx_models, str):
onnx_model = onnx_models
model, config = load_onnx(onnx_model, sess_options, providers)
if config["num_speakers"] > 1:
speaker_selection.options = config["speaker_id_map"].values()
speaker_selection.layout.visibility = 'visible'
preview_sid = 0
if enhanced_accessibility:
playaudio("multispeaker")
else:
speaker_selection.layout.visibility = 'hidden'
preview_sid = None
if enhanced_accessibility:
inferencing(
model,
config,
preview_sid,
lan.translate(
config["espeak"]["voice"][:2],
"Interface openned. Write your texts, configure the different synthesis options or download all the voices you want. Enjoy!"
)
)
else:
voice_model_names = []
for current in onnx_models:
voice_struct = current.split("/")[5]
voice_model_names.append(voice_struct)
if enhanced_accessibility:
playaudio("selectmodel")
selection = widgets.Dropdown(
options=voice_model_names,
description=f'{lan.translate(lang, "Select voice package")}:',
)
load_btn = widgets.Button(
description=lan.translate(lang, "Load it!")
)
config = None
def load_model(button):
nonlocal config
global onnx_model
nonlocal model
nonlocal models_path
selected_voice = selection.value
onnx_model = f"{models_path}/{selected_voice}"
model, config = load_onnx(onnx_model, sess_options, providers)
if enhanced_accessibility:
playaudio("loaded")
if config["num_speakers"] > 1:
speaker_selection.options = config["speaker_id_map"].values()
speaker_selection.layout.visibility = 'visible'
if enhanced_accessibility:
playaudio("multispeaker")
else:
speaker_selection.layout.visibility = 'hidden'
load_btn.on_click(load_model)
display(selection, load_btn)
display(speaker_selection)
speed_slider = widgets.FloatSlider(
value=1,
min=0.25,
max=4,
step=0.1,
description=lan.translate(lang, "Rate scale"),
orientation='horizontal',
)
noise_scale_slider = widgets.FloatSlider(
value=0.667,
min=0.25,
max=4,
step=0.1,
description=lan.translate(lang, "Phoneme noise scale"),
orientation='horizontal',
)
noise_scale_w_slider = widgets.FloatSlider(
value=1,
min=0.25,
max=4,
step=0.1,
description=lan.translate(lang, "Phoneme stressing scale"),
orientation='horizontal',
)
play = widgets.Checkbox(
value=True,
description=lan.translate(lang, "Auto-play"),
disabled=False
)
text_input = widgets.Text(
value='',
placeholder=f'{lan.translate(lang, "Enter your text here")}:',
description=lan.translate(lang, "Text to synthesize"),
layout=widgets.Layout(width='80%')
)
synthesize_button = widgets.Button(
description=lan.translate(lang, "Synthesize"),
button_style='success', # 'success', 'info', 'warning', 'danger' or ''
tooltip=lan.translate(lang, "Click here to synthesize the text."),
icon='check'
)
close_button = widgets.Button(
description=lan.translate(lang, "Exit"),
tooltip=lan.translate(lang, "Closes this GUI."),
icon='check'
)
def on_synthesize_button_clicked(b):
if model is None:
if enhanced_accessibility:
playaudio("nomodel")
raise Exception(lan.translate(lang, "You have not loaded any model from the list!"))
text = text_input.value
if config["num_speakers"] > 1:
sid = speaker_selection.value
else:
sid = None
rate = speed_slider.value
noise_scale = noise_scale_slider.value
noise_scale_w = noise_scale_w_slider.value
auto_play = play.value
inferencing(model, config, sid, text, rate, noise_scale, noise_scale_w, auto_play)
def on_close_button_clicked(b):
clear_output()
if enhanced_accessibility:
playaudio("exit")
synthesize_button.on_click(on_synthesize_button_clicked)
close_button.on_click(on_close_button_clicked)
display(text_input)
display(speed_slider)
display(noise_scale_slider)
display(noise_scale_w_slider)
display(play)
display(synthesize_button)
display(close_button)
def load_onnx(model, sess_options, providers = ["CPUExecutionProvider"]):
_LOGGER.debug("Loading model from %s", model)
config = load_config(model)
model = onnxruntime.InferenceSession(
str(model),
sess_options=sess_options,
providers= providers
)
_LOGGER.info("Loaded model from %s", model)
return model, config
def load_config(model):
with open(f"{model}.json", "r") as file:
config = json.load(file)
return config
PAD = "_" # padding (0)
BOS = "^" # beginning of sentence
EOS = "$" # end of sentence
class PhonemeType(str, Enum):
ESPEAK = "espeak"
TEXT = "text"
def phonemize(config, text: str) -> List[List[str]]:
"""Text to phonemes grouped by sentence."""
if config["phoneme_type"] == PhonemeType.ESPEAK:
if config["espeak"]["voice"] == "ar":
# Arabic diacritization
# https://github.com/mush42/libtashkeel/
text = tashkeel_run(text)
return phonemize_espeak(text, config["espeak"]["voice"])
if config["phoneme_type"] == PhonemeType.TEXT:
return phonemize_codepoints(text)
raise ValueError(f'Unexpected phoneme type: {config["phoneme_type"]}')
def phonemes_to_ids(config, phonemes: List[str]) -> List[int]:
"""Phonemes to ids."""
id_map = config["phoneme_id_map"]
ids: List[int] = list(id_map[BOS])
for phoneme in phonemes:
if phoneme not in id_map:
print("Missing phoneme from id map: %s", phoneme)
continue
ids.extend(id_map[phoneme])
ids.extend(id_map[PAD])
ids.extend(id_map[EOS])
return ids
def inferencing(model, config, sid, line, length_scale = 1, noise_scale = 0.667, noise_scale_w = 0.8, auto_play=True):
audios = []
if config["phoneme_type"] == "PhonemeType.ESPEAK":
config["phoneme_type"] = "espeak"
text = phonemize(config, line)
for phonemes in text:
phoneme_ids = phonemes_to_ids(config, phonemes)
num_speakers = config["num_speakers"]
if num_speakers == 1:
speaker_id = None # for now
else:
speaker_id = sid
text = np.expand_dims(np.array(phoneme_ids, dtype=np.int64), 0)
text_lengths = np.array([text.shape[1]], dtype=np.int64)
scales = np.array(
[noise_scale, length_scale, noise_scale_w],
dtype=np.float32,
)
sid = None
if speaker_id is not None:
sid = np.array([speaker_id], dtype=np.int64)
audio = model.run(
None,
{
"input": text,
"input_lengths": text_lengths,
"scales": scales,
"sid": sid,
},
)[0].squeeze((0, 1))
audio = audio_float_to_int16(audio.squeeze())
audios.append(audio)
merged_audio = np.concatenate(audios)
sample_rate = config["audio"]["sample_rate"]
display(Markdown(f"{line}"))
display(Audio(merged_audio, rate=sample_rate, autoplay=auto_play))
def denoise(
audio: np.ndarray, bias_spec: np.ndarray, denoiser_strength: float
) -> np.ndarray:
audio_spec, audio_angles = transform(audio)
a = bias_spec.shape[-1]
b = audio_spec.shape[-1]
repeats = max(1, math.ceil(b / a))
bias_spec_repeat = np.repeat(bias_spec, repeats, axis=-1)[..., :b]
audio_spec_denoised = audio_spec - (bias_spec_repeat * denoiser_strength)
audio_spec_denoised = np.clip(audio_spec_denoised, a_min=0.0, a_max=None)
audio_denoised = inverse(audio_spec_denoised, audio_angles)
return audio_denoised
def stft(x, fft_size, hopsamp):
"""Compute and return the STFT of the supplied time domain signal x.
Args:
x (1-dim Numpy array): A time domain signal.
fft_size (int): FFT size. Should be a power of 2, otherwise DFT will be used.
hopsamp (int):
Returns:
The STFT. The rows are the time slices and columns are the frequency bins.
"""
window = np.hanning(fft_size)
fft_size = int(fft_size)
hopsamp = int(hopsamp)
return np.array(
[
np.fft.rfft(window * x[i : i + fft_size])
for i in range(0, len(x) - fft_size, hopsamp)
]
)
def istft(X, fft_size, hopsamp):
"""Invert a STFT into a time domain signal.
Args:
X (2-dim Numpy array): Input spectrogram. The rows are the time slices and columns are the frequency bins.
fft_size (int):
hopsamp (int): The hop size, in samples.
Returns:
The inverse STFT.
"""
fft_size = int(fft_size)
hopsamp = int(hopsamp)
window = np.hanning(fft_size)
time_slices = X.shape[0]
len_samples = int(time_slices * hopsamp + fft_size)
x = np.zeros(len_samples)
for n, i in enumerate(range(0, len(x) - fft_size, hopsamp)):
x[i : i + fft_size] += window * np.real(np.fft.irfft(X[n]))
return x
def inverse(magnitude, phase):
recombine_magnitude_phase = np.concatenate(
[magnitude * np.cos(phase), magnitude * np.sin(phase)], axis=1
)
x_org = recombine_magnitude_phase
n_b, n_f, n_t = x_org.shape # pylint: disable=unpacking-non-sequence
x = np.empty([n_b, n_f // 2, n_t], dtype=np.complex64)
x.real = x_org[:, : n_f // 2]
x.imag = x_org[:, n_f // 2 :]
inverse_transform = []
for y in x:
y_ = istft(y.T, fft_size=1024, hopsamp=256)
inverse_transform.append(y_[None, :])
inverse_transform = np.concatenate(inverse_transform, 0)
return inverse_transform
def transform(input_data):
x = input_data
real_part = []
imag_part = []
for y in x:
y_ = stft(y, fft_size=1024, hopsamp=256).T
real_part.append(y_.real[None, :, :]) # pylint: disable=unsubscriptable-object
imag_part.append(y_.imag[None, :, :]) # pylint: disable=unsubscriptable-object
real_part = np.concatenate(real_part, 0)
imag_part = np.concatenate(imag_part, 0)
magnitude = np.sqrt(real_part**2 + imag_part**2)
phase = np.arctan2(imag_part.data, real_part.data)
return magnitude, phase
main()