Source code for TCT.translator_query
import requests
from copy import deepcopy
import json
import pandas
from TCT import translator_metakg
from TCT import translator_kpinfo
[docs]
def get_translator_API_predicates():
'''
Get the predicates supported by each API.
Returns
--------
A dictionary of API names and their predicates.
Examples
--------
>>> API_predicates = get_translator_API_predicates()
'''
Translator_KP_info,APInames= translator_kpinfo.get_translator_kp_info()
print(len(Translator_KP_info))
# Step 2: Get metaKG and all predicates from Translator APIs through the SmartAPI system
metaKG = translator_metakg.get_KP_metadata(APInames)
print(metaKG.shape)
# Add metaKG from Plover API based KG resources
APInames,metaKG = translator_metakg.add_plover_API(APInames, metaKG)
print(metaKG.shape)
# Step 3: list metaKG information
All_predicates = list(set(metaKG['Predicate']))
All_categories = list((set(list(set(metaKG['Subject']))+list(set(metaKG['Object'])))))
API_withMetaKG = list(set(metaKG['API']))
# generate a dictionary of API and its predicates
API_predicates = {}
for api in API_withMetaKG:
API_predicates[api] = list(set(metaKG[metaKG['API'] == api]['Predicate']))
return APInames, metaKG, API_predicates
[docs]
def optimize_query_json(query_json, API_name_cur, API_predicates):
'''
Optimize the query JSON by removing predicates that are not supported by the selected APIs.
Parameters
----------
query_json1 : str
a query in TRAPI 1.5.0 format
API_name_cur : str
the name of the API to query
API_predicates : dict
a dictionary of API names and their predicates
Returns
--------
A modified query JSON with only the predicates supported by the selected APIs.
Examples
--------
>>>
'''
query_json_cur = query_json.copy() # copy the query_json to avoid modifying the original query_json
# Get the list of APIs that support the predicates in the query
shared_predicates = list(set(API_predicates[API_name_cur]).intersection(query_json_cur['message']['query_graph']['edges']['e00']['predicates'] ))
if len(shared_predicates) > 0:
query_json_cur['message']['query_graph']['edges']['e00']['predicates'] = shared_predicates
#print(API_name_cur + ": Predicates optimized to: " + str(shared_predicates))
else:
#print(API_name_cur + ": No shared predicates found. Using all predicates in the query.")
# If no shared predicates, keep the original predicates
query_json_cur['message']['query_graph']['edges']['e00']['predicates'] = query_json_cur['message']['query_graph']['edges']['e00']['predicates']
return query_json_cur
[docs]
def query_KP(API_name_cur, query_json, APInames, API_predicates):
"""
Query an individual API with a TRAPI 1.5.0 query JSON,
without modifying the original query_json.
"""
API_url_cur = APInames[API_name_cur]
from copy import deepcopy
# deep‐copy so we never touch the caller’s data
query_copy = deepcopy(query_json)
# optimize on our private copy
query_json_cur = optimize_query_json(query_copy, API_name_cur, API_predicates)
response = requests.post(API_url_cur, json=query_json_cur)
if response.status_code == 200:
result = response.json().get("message", {})
kg = result.get("knowledge_graph", {})
edges = kg.get("edges", {})
if edges:
print(f"{API_name_cur}: Success!")
return result
elif "knowledge_graph" in result:
return None
#print(f"{API_name_cur}: No result returned")
else:
#print(f"{API_name_cur}: Warning Code: {response.status_code}")
return None
[docs]
def parallel_api_query(query_json, select_APIs, APInames, API_predicates,max_workers=1):
'''
Queries multiple APIs in parallel and merges the results into a single knowledge graph.
Parameters
----------
URLS
list of API URLs to query
query_json
the query JSON to be sent to each API
max_workers
number of parallel workers to use for querying
Returns
-------
Returns a merged knowledge graph from all successful API responses.
Examples
--------
>>> result = TCT.parallel_api_query(API_URLs,query_json=query_json, max_workers=len(API_URLs1))
'''
# Parallel query
result = []
from concurrent.futures import ThreadPoolExecutor, as_completed
from copy import deepcopy
with ThreadPoolExecutor(max_workers=max_workers) as executor:
# copy the query_json for each API to avoid modifying the original query_json
query_json_cur = deepcopy(query_json)
future_to_url = {executor.submit(query_KP, API_name_cur, query_json_cur, APInames, API_predicates): API_name_cur for API_name_cur in select_APIs}
for future in as_completed(future_to_url):
url = future_to_url[future]
try:
data = future.result()
if 'knowledge_graph' in data:
result.append(data)
except Exception as exc:
print('%r generated an exception: %s' % (url, exc))
included_KP_ID = []
for i in range(0,len(result)):
if result[i]['knowledge_graph'] is not None:
if 'knowledge_graph' in result[i]:
if 'edges' in result[i]['knowledge_graph']:
if len(result[i]['knowledge_graph']['edges']) > 0:
included_KP_ID.append(i)
result_merged = {}
for i in included_KP_ID:
result_merged = {**result_merged, **result[i]['knowledge_graph']['edges']}
len(result_merged)
return(result_merged)