import json import re import string corenlp_props = { 'annotators': 'tokenize, pos, ner, dcoref', 'pipelineLanguage': 'en', 'outputFormat': 'json', 'parse.maxlen': '1000', 'timeout': '500000' } STOPWORDS_FILE = "../data/resource/stopwords.txt" DOMAINS = ["hotel", "train", "restaurant", "attraction", "taxi", "book"] SLOTS = {'area', 'arrive', 'day', 'departure', 'destination', 'food', 'internet', 'leave', 'name', 'parking', 'people', 'price', 'stars', 'stay', 'time', 'type'} VALUES_CONVERT = { 'zero': '0', 'one': '1', 'two': '2', 'three': '3', 'four': '4', 'five': '5', 'six': '6', 'seven': '7', 'eight': '8', 'nine': '9', 'wifi': 'internet', 'wlan': 'internet', 'wi-fi': 'internet', 'moderately': 'moderate', } def bad_entity(text): if text == "this": return True if text == "that": return True if text == "there": return True if text == "here": return True if text == "|": return True if text == "less": return True if text == "more": return True return False def fix_stanford_coref(stanford_json): true_corefs = {} # get a chain for key, coref in stanford_json["corefs"].items(): true_coref = [] # get an entity mention for entity in coref: sent_num = entity["sentNum"] - 1 # starting from 0 start_index = entity["startIndex"] - 1 # starting from 0 end_index = entity["endIndex"] - 1 # starting from 0 head_index = entity["headIndex"] - 1 # starting from 0 entity_label = stanford_json["sentences"][ sent_num]["tokens"][head_index]["ner"] entity["sentNum"] = sent_num entity["startIndex"] = start_index entity["endIndex"] = end_index entity["headIndex"] = head_index entity["headWord"] = entity["text"].split( " ")[head_index - start_index] entity["entityType"] = entity_label true_coref.append(entity) # check link is not empty if len(true_coref) > 0: no_representative = True has_representative = False for idx, entity in enumerate(true_coref): if entity["isRepresentativeMention"]: if not (entity["type"] == "PRONOMINAL" or bad_entity(entity["text"].lower()) or len(entity["text"].split(" ")) > 10): no_representative = False has_representative = True # remove bad representative assignments else: true_coref[idx]["isRepresentativeMention"] = False # check there exists one representative mention if no_representative: for idx, entity in enumerate(true_coref): if not (entity["type"] == "PRONOMINAL" or bad_entity(entity["text"].lower()) or len(entity["text"].split(" ")) > 10): true_coref[idx]["isRepresentativeMention"] = True has_representative = True if has_representative: true_corefs[key] = true_coref return true_corefs def clean(corefs: list, stopwords_list: list): dup_ids = [] for i, coref1 in enumerate(corefs): consist_num = 0 short = [] for j, coref2 in enumerate(corefs): if coref1[2][0] <= coref2[2][0] and coref1[2][1] >= coref2[2][1] and (not i == j): consist_num += 1 short.append(j) if consist_num > 1: dup_ids.append(i) elif consist_num == 1: dup_ids.extend(short) corefs = [corefs[i] for i in range(len(corefs)) if i not in dup_ids] temp = [] for coref in corefs: seq = coref[-1].split() while seq and (seq[0] in stopwords_list or seq[-1] in stopwords_list): if seq[0] in stopwords_list: del seq[0] if seq[-1] in stopwords_list: del seq[-1] if not seq: temp.append(coref) else: coref[-1] = ' '.join(seq) for t in temp: corefs.remove(t) return corefs def get_candidates(user_annotation, stopwords_list): """Candidates include adjs, entities and corefs.""" tokens = [] candidates = {} entities = [] postags = [] corefs = [] base_index = [0] read_annotation(user_annotation, base_index, stopwords_list, tokens, entities, postags, corefs, 0) candidates['postag'] = postags candidates['coref'] = clean(corefs, stopwords_list) candidates['coref'].extend(entities) return candidates def is_stop(text: str, stopwords_list: list): text = list(filter(lambda x: x.lower() not in stopwords_list, text.split())) if text: return True else: return False def read_annotation(annotation, base_index, stopwords_list, tokens, entities, postags, corefs, num_sen): sentences = annotation["sentences"] for i, sentence in enumerate(sentences): for entity in sentence['entitymentions']: head_idx = base_index[i + num_sen] + entity['tokenBegin'] head = sentence['tokens'][entity['tokenBegin']]['originalText'] mention = entity['text'] mention_start_idx = base_index[i + num_sen] + entity['tokenBegin'] mention_end_idx = base_index[i + num_sen] + entity['tokenEnd'] mention_idx = [mention_start_idx, mention_end_idx] entities.append([head_idx, head, mention_idx, mention]) for j, token in enumerate(sentence['tokens']): tokens.append(token['word']) pos = token['pos'] lemma = token['lemma'] text = token['originalText'] if pos in ['JJ', 'RB']: try: prev = sentence['tokens'][j - 1]['originalText'] except IndexError: prev = '' if (not re.search(r"([a-z]\.[a-z])", lemma)) \ and lemma not in stopwords_list and prev != 'not': head_idx = base_index[i + num_sen] + token['index'] - 1 postags.append([head_idx, text]) base_index.append(base_index[-1] + len(sentence['tokens'])) for coref in annotation['corefs'].values(): for realization in coref: sent_num = realization['sentNum'] head_index = realization['headIndex'] head_idx = base_index[sent_num + num_sen] + head_index head = sentences[sent_num]['tokens'][head_index]['originalText'] text_start_index = realization['startIndex'] text_start_idx = base_index[sent_num + num_sen] + text_start_index text_end_index = realization['endIndex'] text_end_idx = base_index[sent_num + num_sen] + text_end_index text_lemma = sentences[sent_num]['tokens'][text_start_index:text_end_index] text_lemma = ' '.join(list(map(lambda x: x['originalText'], text_lemma))) try: prev1 = sentences[sent_num]['tokens'][text_start_index - 1]['originalText'] prev2 = sentences[sent_num]['tokens'][text_start_index - 2]['originalText'] except BaseException: prev1 = '' prev2 = '' should_stop = is_stop(text_lemma, stopwords_list) if should_stop and prev1 != 'not' and prev2 != 'not': corefs.append([head_idx, head, [text_start_idx, text_end_idx], text_lemma]) def get_value_candidates_from_history(corenlp, history): if len(history) == 0: return [] stopwords = [] with open(STOPWORDS_FILE, 'r') as fin: for line in fin: stopwords.append(line.strip()) value_candidates = set() user_utterance = ' '.join(utterance[len('user :'):] for utterance in history if utterance.startswith('user :')) annotation = json.loads(corenlp.annotate(user_utterance, properties=corenlp_props)) annotation['corefs'] = fix_stanford_coref(annotation) candidates = get_candidates(annotation, stopwords) for _, candidate in candidates.items(): for c in candidate: if len(c) == 2: value_candidates.add(c[1].strip().lower()) else: if len(c[3].split()) > 5: value_candidates.add(c[1].strip().lower()) else: value_candidates.add(c[3].strip().lower()) # clean value candidates values = set() for value in value_candidates: if value in VALUES_CONVERT: value = VALUES_CONVERT[value] if value not in DOMAINS \ and value not in SLOTS \ and value not in string.punctuation \ and value not in stopwords \ and not value.startswith("'"): # remove spaces before punctuation value = re.sub(r"\s+([?.!'])", r"\1", value).strip() if value and value[0].isdigit(): # remove everything after end of a number value = re.sub(r'\D+$', '', value) if value.strip() and len(value.split()) <= 4: values.add(value.strip()) return list(values)