notebooks/closed-book-qa/Closed Book QA Generator.ipynb
Creates a set of questions and answers to a given paragraph. Allows for sample topics, questions and answers for few-shot examples.
A JSON input file with the following structure is required: (paragraph is the only required tag.)
{
"paragraphs":
[{
"text": "",
"topics": [""],
"questions": [""],
"answers": [""]
}]
}
This notebook will run on a system with a single RTX3090 (24 GB vram) GPU. If you're using Colab, don't forget to change the Runtime to GPU-accelerated!
Inference code and structure provided by @ontocord
(Only required to run once)
model_hf_name = "google/flan-t5-large" # @param {type:"string"}
verbose = False # @param {type:"boolean"}
file_path = "/content/paragraphs.json" # @param {type:"string"}
output_path = "/content/questions_dict.json" # @param {type:"string"}
(Only required to run once)
# Install with pip
!pip install accelerate
!pip install bitsandbytes
!pip install transformers
# Load all necessary libraries
import math
import pickle
import time
import torch
import random
import json
from transformers import AutoModelForSeq2SeqLM, AutoTokenizer, AutoModel
from torch.nn.functional import cosine_similarity
# This device map will work a GPU with > 24GB vram. It uses nearly all memory.
device_map_T5_13B = {
"shared": 0,
"decoder.embed_tokens": 0,
"encoder.embed_tokens": 0,
"encoder.block.0": 0,
"encoder.block.1": 0,
"encoder.block.2": 0,
"encoder.block.3": 0,
"encoder.block.4": 0,
"encoder.block.5": 0,
"encoder.block.6": 0,
"encoder.block.7": 0,
"encoder.block.8": 0,
"encoder.block.9": 0,
"encoder.block.10": 0,
"encoder.block.11": 0,
"encoder.block.12": 0,
"encoder.block.13": 0,
"encoder.block.14": 0,
"encoder.block.15": 0,
"encoder.block.16": 0,
"encoder.block.17": 0,
"encoder.block.18": 0,
"encoder.block.19": 0,
"encoder.block.20": 0,
"encoder.block.21": 0,
"encoder.block.22": 0,
"encoder.block.23": 0,
"encoder.final_layer_norm": 0,
"encoder.dropout": 0,
"decoder.block.0": 0,
"decoder.block.1": 0,
"decoder.block.2": 0,
"decoder.block.3": 0,
"decoder.block.4": 0,
"decoder.block.5": 0,
"decoder.block.6": 0,
"decoder.block.7": 0,
"decoder.block.8": 0,
"decoder.block.9": 0,
"decoder.block.10": 0,
"decoder.block.11": 0,
"decoder.block.12": 0,
"decoder.block.13": 0,
"decoder.block.14": 0,
"decoder.block.15": 0,
"decoder.block.16": 0,
"decoder.block.17": 0,
"decoder.block.18": 0,
"decoder.block.19": 0,
"decoder.block.20": 0,
"decoder.block.21": 0,
"decoder.block.22": 0,
"decoder.block.23": 0,
"decoder.final_layer_norm": 0,
"decoder.dropout": 0,
"lm_head": 0,
}
# Load the model in bfloat16. Make sure to use bfloat16
# if you are doing inference with 16bit precision.
try:
if tokenizer is not None:
pass
except:
tokenizer = AutoTokenizer.from_pretrained(model_hf_name)
model = AutoModelForSeq2SeqLM.from_pretrained(
model_hf_name,
device_map=device_map_T5_13B,
torch_dtype=torch.bfloat16,
load_in_8bit=False,
)
minilm_tokenizer = AutoTokenizer.from_pretrained("sentence-transformers/all-MiniLM-L6-v2")
minilm_model = AutoModel.from_pretrained("sentence-transformers/all-MiniLM-L6-v2").half().eval().cuda()
# Inference
def Write_Line(name, value):
name_stripped = name.strip
value_stripped = value.strip
output = "{}:\n{}\n"
return output.format(name_stripped, value_stripped)
# ask_flan_T5 takes a text input and returns the
# response of FLAN_T5 and a normalized logits
# score for the generation.
# Input: input_text (string): A string used as the prompt directed to the model
# Output: out_tuple (tuple): A list of string-float pairs that contain results and the normalized logit
def ask_flan_T5(input_text):
inputs = tokenizer.encode(input_text, return_tensors="pt").cuda(0)
outputs = model.generate(
inputs,
do_sample=True,
top_p=0.95,
eos_token_id=1,
max_new_tokens=50,
bos_token_id=0,
temperature=0.9,
return_dict_in_generate=True,
output_scores=True,
)
out_text = tokenizer.decode(outputs.sequences[0], skip_special_tokens=True)
probs = torch.stack(outputs.scores, dim=1).softmax(-1)
for i in outputs.sequences:
logprobs = 0
counter = 0
for k in i[1:]:
word_prob = (round(probs[0][counter][k.item()].item(), 2)) + 0.001
logprobs = logprobs + math.log(word_prob)
counter += 1
out_tuple = (out_text, round(logprobs, 2))
return out_tuple
# ask_flan_T5D is a function that takes an input text and
# returns the deterministic(do_sample=False) output of
# FLAN_T5 and logits.
def ask_flan_T5D(input_text):
inputs = tokenizer.encode(input_text, return_tensors="pt").cuda(0)
outputs = model.generate(
inputs,
do_sample=False,
eos_token_id=1,
max_new_tokens=50,
bos_token_id=0,
return_dict_in_generate=True,
output_scores=True,
)
out_text = tokenizer.decode(outputs.sequences[0], skip_special_tokens=True)
probs = torch.stack(outputs.scores, dim=1).softmax(-1)
for i in outputs.sequences:
logprobs = 0
counter = 0
for k in i[1:]:
word_prob = (round(probs[0][counter][k.item()].item(), 2)) + 0.001
logprobs = logprobs + math.log(word_prob)
counter += 1
out_tuple = (out_text, round(logprobs, 2))
return out_tuple
# Topics
def Get_Topics(paragraph_dict):
output_dict = {}
for id in paragraph_dict:
topic_list = Get_Topic(paragraph_dict[id])
output_dict[id] = topic_list
return output_dict
def Get_Topic(paragraph_item):
topic_list = generate_topic(paragraph_item)
return topic_list
# Generate a topic classifier for a paragraph of text
def generate_topic(paragraph):
paragraph_text = paragraph["paragraph"]
sample_topics = paragraph["sample topics"]
samples = ""
if len(sample_topics) > 0:
k = random.randint(0, len(sample_topics) - 1)
samples += Write_Line("Topic", sample_topics[k])
results = set()
input_text = (
"Task: Create a topic classifier for the provided paragraph.\
\nParagraph:\n"
+ paragraph_text
+ "\n"
+ samples
+ "Topic:\n"
)
for k in range(0, 20):
result = ask_flan_T5(input_text)
if result[1] > -4:
results.add(result)
if len(results) < 3:
results.add(("I was wondering", -3.3))
results.add(("I have a question", -3.3))
sorted_results = Sort_Tuple(list(results))
return sorted_results[0:5]
# Prefixes
def Get_Prefixes(paragraph_dict):
output_dict = {}
for id in paragraph_dict:
prefix_list = Get_Prefix(paragraph_dict, id)
output_dict[id] = prefix_list
return output_dict
def Get_Prefix(paragraph_dict, id):
prefix_list = generate_topic_prefix(paragraph_dict[id])
return prefix_list
# Generate a topic classifier for a paragraph of text
def generate_topic_prefix(topic_set):
results = set()
for entry in topic_set:
topic = entry[0]
input_text = (
"Task: Create a prepositional phrase about the topic.\n\
Example 1\n Topic: Climbing Mount Everest\nPrepositional \
Phrase: With regards to climbing Mount Everest,\nExample \
2\nTopic: United States Air Force\nPrepositional Phrase: \
On the topic of the United States Air Force,\n Example 3\nTopic: "
+ topic
+ "\nPrepositional Phrase: "
)
for k in range(0, 5):
results.add(ask_flan_T5(input_text))
sorted_results = Sort_Tuple(list(results))
return sorted_results[0:5]
# Questions
def Get_Questions(paragraph_dict, number_of_questions):
output_dict = {}
for id in paragraph_dict:
question_list = Get_Question(paragraph_dict, id, number_of_questions)
output_dict[id] = question_list
return output_dict
def Get_Question(paragraph_dict, id, number_of_questions):
question_list = generate_questions(paragraph_dict[id], number_of_questions)
return question_list
# Generate who/what/where/when/why questions from a paragraph.
# Number of questions variable is an integer which indicates how
# many of each question type to try to generate.
def generate_questions(paragraph, number_of_questions):
paragraph_text = paragraph["paragraph"]
if len(tokenizer.encode(paragraph_text)) > 480:
print("Warning, the context length is too long.")
question_set = set()
question_types = ["What", "Where", "Why", "How", "How much", "Who", "When", "Which"]
for qtype in question_types:
question = (
"Please generate a question that starts with '"
+ qtype
+ "' based on the following paragraph.\nText:\n"
+ paragraph_text
+ "\nQuestion:\n"
)
for k in range(0, number_of_questions):
new_question = ask_flan_T5(question)
if qtype in new_question[0]:
question_set.add((qtype, new_question))
return question_set
# Answers
def Get_Answers(paragraph_dict, question_dict):
output_dict = {}
for id in paragraph_dict:
answer_list = Get_Answer(paragraph_dict, id, question_dict[id])
output_dict[id] = answer_list
return output_dict
def Get_Answer(paragraph_dict, id, question_list):
answer_list = generate_answers(paragraph_dict[id], question_list)
return answer_list
# Generate answers for a set of questions.
# Input is the paragraph of text and a set of questions where each question
# is a tuple generated from the generate_questions() function.
def generate_answers(paragraph, question_set):
paragraph_text = paragraph["paragraph"]
sample_questions = paragraph["sample questions"]
sample_answers = paragraph["sample answers"]
possible_answers = set()
for question in question_set:
samples = ""
if len(sample_questions) > 0:
k = random.randint(0, len(sample_questions) - 1)
samples += Write_Line("Question", sample_questions[k])
samples += Write_Line("Answer", sample_answers[k])
input_text = (
"Please read the following paragraph and \
answer the question using only data \
found in the text. If no answer is possible, respond \
'NA'.\nParagraph:\n"
+ paragraph_text
+ "\n"
+ samples
+ "Question:\n"
+ question[1][0]
+ "\nAnswer:\n"
)
answer = ask_flan_T5D(input_text)
possible_answers.add((question[0], question[1], answer))
return possible_answers
# Deduced Questions
def Get_Questions2(paragraph_dict, answer_dict):
output_dict = {}
for id in paragraph_dict:
question2_list = Get_Question2(paragraph_dict, id, answer_dict[id])
output_dict[id] = question2_list
return output_dict
def Get_Question2(paragraph_dict, id, answer_list):
question2_list = generate_question2(paragraph_dict[id], answer_list)
return question2_list
# Generate questions from a paragraph and set of answers.
# Input is the paragraph of text and a set of answers where each question
# is a tuple generated from the generate_answers() function.
def generate_question2(paragraph, qa_set):
paragraph_text = paragraph["paragraph"]
sample_questions = paragraph["sample questions"]
sample_answers = paragraph["sample answers"]
qaq_results = set()
for qa_item in qa_set:
samples = ""
if len(sample_questions) > 0:
k = random.randint(0, len(sample_questions) - 1)
samples += Write_Line("Answer", sample_answers[k])
samples += Write_Line("Question", sample_questions[k])
answer = qa_item[2][0]
input_text = (
"Please read the following paragraph and \
generate a question to the given answer."
+ "\nParagraph:\n"
+ paragraph_text
+ "\n"
+ samples
+ "Answer:\n"
+ answer
+ "\nQuestion:\n"
)
result = ask_flan_T5D(input_text)
qaq_results.add((qa_item[0], qa_item[1], qa_item[2], result))
return qaq_results
# Answers to Deduced Questions
def Get_Answers2(paragraph_dict, question2_dict):
output_dict = {}
for id in paragraph_dict:
answer2_list = Get_Answer2(paragraph_dict, id, question2_dict[id])
output_dict[id] = answer2_list
return output_dict
def Get_Answer2(paragraph_dict, id, question2_list):
answer_list = generate_answers2(paragraph_dict[id], question2_list)
return answer_list
# Generate answers from a paragraph and set of questions.
# Input is the paragraph of text and a set of questions where each answer
# is a tuple generated from the generate_questions2() function.
def generate_answers2(paragraph, question2_set):
paragraph_text = paragraph["paragraph"]
possible_answers = set()
for qaq2_item in question2_set:
question2 = qaq2_item[3][0]
input_text = (
"Please read the following paragraph and \
then answer the question using only data \
found in the text. If no answer is possible, respond \
'NA'.\nText:\n"
+ paragraph_text
+ "\nQuestion:\n"
+ question2
+ "\nAnswer:\n"
)
answer = ask_flan_T5D(input_text)
possible_answers.add((question2, answer))
return possible_answers
# Declaratives
def Get_Declaratives(paragraph_dict, answer2_dict):
output_dict = {}
for id in paragraph_dict:
declarative_list = Get_Declarative(answer2_dict[id])
output_dict[id] = declarative_list
return output_dict
def Get_Declarative(answer2_list):
declarative_list = generate_declarative(answer2_list)
return declarative_list
# Generate declarative statement from question and answer pair.
def generate_declarative(qaq_set):
qaqd_results = set()
for qa_item in qaq_set:
question = qa_item[0]
answer = qa_item[1][0]
if "NA" in answer:
qaqd_results.add((question, answer, qa_item[1]))
else:
input_text = (
"Generate a declarative statement based on the \
given question and answer pair.\nQ: What is \
sitting on the couch?\nA: poodle\nA poodle is \
sitting on the couch.\nQ: "
+ question
+ "\nA: "
+ answer
+ "\n"
)
result = ask_flan_T5D(input_text)
qaqd_results.add((question, answer, result))
return qaqd_results
# Closed Answers
def Get_Closed_Answers(paragraph_dict, question2_dict, prefix_dict):
output_dict = {}
for id in paragraph_dict:
try:
prefix_list = prefix_dict[id]
except Exception:
prefix_list = None
closed_answer_list = Get_Closed_Answer(question2_dict[id], prefix_list)
output_dict[id] = closed_answer_list
return output_dict
def Get_Closed_Answer(answer_list, prefix_list):
closed_answer_list = generate_closed_answer(answer_list, prefix_list)
return closed_answer_list
# Generate closed book answer to question.
def generate_closed_answer(qaqd_set, topic_prefix):
if topic_prefix:
topic_prefix = [a[0] for a in topic_prefix]
topic_prefix.sort(key=lambda a: len(a[0]), reverse=True)
topic_prefix = topic_prefix[0]
else:
topic_prefix = None
qaqd_results = set()
for qa_item in qaqd_set:
question = qa_item[0]
answer = qa_item[2][0]
if "NA" in answer:
if len(qa_item) == 3:
qaqd_results.add((qa_item[0], qa_item[1], qa_item[2], qa_item[2]))
else:
qaqd_results.add((qa_item[0], qa_item[1], qa_item[2], qa_item[2], qa_item[2]))
pass
else:
input_text = (
"Task: Answer the question in a detailed fashion. \
If the question cannot be answered without more \
information, please answer NA.\nExample 1:\nQuestion: \
Why does Shala like cookies?\nAnswer: It is not possible \
to know why Shala likes cookies without more information, \
but many people that like cookies enjoy their taste or \
some of their ingredients (e.g. chocolate chips or \
peanut butter).\nExample 2:\nQuestion: Why would someone \
vote in an election?\nAnswer: There are many reasons \
someone might vote in an election, for instance to have \
their voice heard or to help a candidate they like win the \
race.\nExample 3\nQuestion: What decoration goes on top of \
a Christmas tree?\nAnswer: Usually a star is placed at the \
top of a Christmas tree.\nExample 4:\nQuestion: "
+ (question if topic_prefix is None else (topic_prefix + " " + question))
+ "\nAnswer: "
)
result = ask_flan_T5D(input_text)
if len(qa_item) == 3:
qaqd_results.add((qa_item[0], qa_item[1], qa_item[2], result))
else:
qaqd_results.add((qa_item[0], qa_item[1], qa_item[2], qa_item[3], result))
return qaqd_results
# Tools
# Sort_Tuple sorts a list of tuples
# by the second element.
def Sort_Tuple(tup):
tup.sort(key=lambda x: x[1], reverse=True)
return tup
def Lower_First_Char(input):
return input[0].lower() + input[1:]
def Format_Answer(answer, score):
if score < 0.75:
output = "I don't know. I cannot tell you the answer with the information I have."
elif score < 0.8:
output = "I don't know for certain, but maybe " + Lower_First_Char(answer)
elif score < 0.9:
output = "I believe " + Lower_First_Char(answer)
else:
output = answer
return output
def mean_pooling(model_output, attention_mask):
with torch.no_grad():
token_embeddings = model_output.last_hidden_state
input_mask_expanded = attention_mask.unsqueeze(-1).expand(token_embeddings.size()).to(token_embeddings.dtype)
return torch.sum(token_embeddings * input_mask_expanded, 1) / torch.clamp(input_mask_expanded.sum(1), min=1e-9)
def Get_Mean_Vector(input):
toks = minilm_tokenizer(input, padding=True, truncation=True, return_tensors="pt").to("cuda")
dat = minilm_model(**toks)
dat = mean_pooling(dat, toks.attention_mask)
return dat
def Truncate_String(input, length):
if len(input) > length:
input = input[: length - 3] + "..."
return input
# Dictionary Management
# Discards paragraphs that are too long or don't have the same number of questions and answers.
# Input: paragraphs (list): A list of dictionaries containing the text, sample questions and sample answers of a paragraph
def Fix_Paragraphs(paragraphs):
fixed_paragraphs = []
for paragraph in paragraphs:
text = paragraph["text"]
text_trunc = Truncate_String(text, 50)
questions = paragraph["questions"]
answers = paragraph["answers"]
if len(questions) != len(answers):
if verbose:
print(text_trunc, "Questions and answers have to have the same number of items!")
continue
if verbose:
print(text_trunc, len(text), "chars")
if len(text) > 0 and len(text) <= 1100:
fixed_paragraphs.append(paragraph)
print("\nOriginal number of paragraphs:", len(paragraphs))
print("Length filtered number of paragraphs:", len(fixed_paragraphs))
return fixed_paragraphs
def Init_Dictionary(paragraphs):
paragraph_dict = {}
uniq_id = 100000
for paragraph in paragraphs:
paragraph_dict[uniq_id] = {}
paragraph_dict[uniq_id]["paragraph"] = paragraph["text"]
paragraph_dict[uniq_id]["sample topics"] = paragraph["topics"]
paragraph_dict[uniq_id]["sample questions"] = paragraph["questions"]
paragraph_dict[uniq_id]["sample answers"] = paragraph["answers"]
uniq_id += 1
return paragraph_dict
def Attach_Generated_Content(paragraph_dict, topic_dict, prefix_dict, prefix_answer_dict):
for id in paragraph_dict.keys():
paragraph_dict[id]["topics"] = topic_dict[id]
paragraph_dict[id]["topic prepositions"] = prefix_dict[id]
paragraph_dict[id]["QA_set"] = Get_QA_Dict(prefix_answer_dict[id])
def Get_QA_Dict(prefix_answer_list):
k = 0
output_dict = {}
for entry in prefix_answer_list:
output_dict[k] = {}
output_dict[k]["question"] = entry[0]
output_dict[k]["answer_T5_ob"] = entry[2][0]
output_dict[k]["answer_T5_cb"] = entry[3][0]
output_dict[k]["answer_T5_cb_with_prefix"] = entry[4][0]
if output_dict[k]["answer_T5_ob"] == "NA":
output_dict[k][
"answer_T5_answer"
] = "Either I do not understand this question, or this question cannot be answered."
else:
answer_ob = output_dict[k]["answer_T5_ob"]
dat_ob = Get_Mean_Vector(answer_ob)
answer_cb = output_dict[k]["answer_T5_cb"]
dat_cb = Get_Mean_Vector(answer_cb)
score_cb = cosine_similarity(dat_ob, dat_cb).item()
output_dict[k]["answer_T5_answer"] = Format_Answer(answer_ob, score_cb)
answer_prefix_format = output_dict[k]["answer_T5_answer"]
if len(answer_cb) < len(output_dict[k]["answer_T5_cb_with_prefix"]):
answer_prefix = output_dict[k]["answer_T5_cb_with_prefix"]
dat_prefix = Get_Mean_Vector(answer_prefix)
score_prefix = cosine_similarity(dat_ob, dat_prefix).item()
if score_cb < score_prefix:
answer_prefix_format = Format_Answer(answer_ob, score_prefix)
output_dict[k]["answer_T5_answer_with_prefix"] = answer_prefix_format
k += 1
return output_dict
with open(file_path, "rb") as f:
f_text = f.read()
root = json.loads(f_text)
paragraphs = root["paragraphs"]
paragraphs_fixed = Fix_Paragraphs(paragraphs)
if len(paragraphs_fixed) == 0:
raise Exception("No valid paragraph found.")
paragraph_dict = Init_Dictionary(paragraphs_fixed)
# @title Generate topics
start_time = time.perf_counter()
topic_dict = Get_Topics(paragraph_dict)
stop_time = time.perf_counter()
generation_time = stop_time - start_time
print("Topic generation time: " + str(generation_time))
if verbose:
for topic_key in topic_dict:
print(" {}:".format(topic_key))
print(*topic_dict[topic_key], sep="\n")
# @title Generate prefixes
start_time = time.perf_counter()
prefix_dict = Get_Prefixes(topic_dict)
stop_time = time.perf_counter()
generation_time = stop_time - start_time
print("Prefix generation time: " + str(generation_time))
if verbose:
for prefix_key in prefix_dict:
print(" {}:".format(prefix_key))
print(*prefix_dict[prefix_key], sep="\n")
# @title Generate questions
start_time = time.perf_counter()
question_dict = Get_Questions(paragraph_dict, 2)
stop_time = time.perf_counter()
generation_time = stop_time - start_time
print("Question generation time: " + str(generation_time))
if verbose:
for question_key in question_dict:
print(" {}:".format(question_key))
print(*question_dict[question_key], sep="\n")
# @title Generate answers
start_time = time.perf_counter()
answer_dict = Get_Answers(paragraph_dict, question_dict)
stop_time = time.perf_counter()
generation_time = stop_time - start_time
print("Answer generation time: " + str(generation_time))
if verbose:
for answer_key in answer_dict:
print(" {}:".format(answer_key))
print(*answer_dict[answer_key], sep="\n")
# @title Generate questions from answers
start_time = time.perf_counter()
question2_dict = Get_Questions2(paragraph_dict, answer_dict)
stop_time = time.perf_counter()
generation_time = stop_time - start_time
print("Question from answer generation time: " + str(generation_time))
if verbose:
for question2_key in question2_dict:
print(" {}:".format(question2_key))
print(*question2_dict[question2_key], sep="\n")
# @title Generate answers to questions from answers
start_time = time.perf_counter()
answer2_dict = Get_Answers2(paragraph_dict, question2_dict)
stop_time = time.perf_counter()
generation_time = stop_time - start_time
print("Answer to question from answer generation time: " + str(generation_time))
if verbose:
for answer2_key in answer2_dict:
print(" {}:".format(answer2_key))
print(*answer2_dict[answer2_key], sep="\n")
# @title Generate declaratives
start_time = time.perf_counter()
declarative_dict = Get_Declaratives(paragraph_dict, answer2_dict)
stop_time = time.perf_counter()
generation_time = stop_time - start_time
print("Declarative generation time: " + str(generation_time))
if verbose:
for declarative_key in declarative_dict:
print(" {}:".format(declarative_key))
print(*declarative_dict[declarative_key], sep="\n")
# @title Generate closed answers
start_time = time.perf_counter()
closed_answer_dict = Get_Closed_Answers(paragraph_dict, declarative_dict, None)
stop_time = time.perf_counter()
generation_time = stop_time - start_time
print("Closed answer generation time: " + str(generation_time))
if verbose:
for closed_answer_key in closed_answer_dict:
print(" {}:".format(closed_answer_key))
print(*closed_answer_dict[closed_answer_key], sep="\n")
# @title Generate closed answers with prefix
start_time = time.perf_counter()
prefix_answer_dict = Get_Closed_Answers(paragraph_dict, closed_answer_dict, prefix_dict)
stop_time = time.perf_counter()
generation_time = stop_time - start_time
print("Closed answer with prefix generation time: " + str(generation_time))
if verbose:
for prefix_answer_key in prefix_answer_dict:
print(" {}:".format(prefix_answer_key))
print(*prefix_answer_dict[prefix_answer_key], sep="\n")
Attach_Generated_Content(paragraph_dict, topic_dict, prefix_dict, prefix_answer_dict)
with open(output_path, "w") as output:
file = {"paragraphs": paragraph_dict}
output.write(json.dumps(file, indent=2))
print("Saved output to", output_path)