import stanza import os from pathlib import Path from stanza.server import CoreNLPClient STOPWORDS_FILE = "../data/resource/stopwords.txt" CORENLP_DIR = str(Path().absolute()) + '/corenlp-dir' # properties for the CoreNLP Server ANNOTATORS = ['tokenize', 'ssplit', 'pos', 'lemma', 'ner', 'regexner'] # install/download the latest version of CoreNLP stanza.install_corenlp(dir=CORENLP_DIR) extra_model_jar = os.path.join(CORENLP_DIR, 'stanford-corenlp-4.5.1-models-english-extra.jar') if not os.path.isfile(extra_model_jar): # download corenlp english models stanza.download_corenlp_models(model='english-extra', version='4.5.1', dir=CORENLP_DIR) else: print('English Extra CoreNLP models available!') # set environment var of installation location os.environ["CORENLP_HOME"] = CORENLP_DIR VALUES_CONVERT = { 'zero': '0', 'one': '1', 'two': '2', 'three': '3', 'four': '4', 'five': '5', 'six': '6', 'seven': '7', 'eight': '8', 'nine': '9', 'ten': '10', 'not important': 'dont care', 'not expensive': 'cheap', 'not cheap': 'expensive' } # corenlp properties properties = { 'ner.applyFineGrained': False, 'pos.model': 'edu/stanford/nlp/models/pos-tagger/english-caseless-left3words-distsim.tagger', 'ner.model': 'edu/stanford/nlp/models/ner/english.all.3class.caseless.distsim.crf.ser.gz,' 'edu/stanford/nlp/models/ner/english.muc.7class.caseless.distsim.crf.ser.gz,' 'edu/stanford/nlp/models/ner/english.conll.4class.caseless.distsim.crf.ser.gz', 'ner.additional.regexner.mapping': str(Path().absolute()) + '/regexner.rules' } DATE_TIME_NUM = {'TIME', 'DATE', 'NUMBER'} class ValueExtractor: def __init__(self): # create the CoreNLP client self.client = CoreNLPClient( annotators=ANNOTATORS, properties=properties, timeout=300000, memory='8G', threads=8, be_quiet=True ) # load the stopwords.txt file self.stopwords = set() with open(STOPWORDS_FILE, 'r') as file: for line in file: self.stopwords.add(line.strip()) def start(self): self.client.start() def stop(self): self.client.stop() def extract_value_candidates(self, history): # format history and prepare text for annotation user_utterance = '' for utterance in history: # consider only user utterances if utterance.startswith('user :'): utterance = utterance[len('user :'):] # if sentence doesn't end with punctuation, add . if not utterance.endswith(('.', '?', '!')): utterance = utterance + '.' # append all utterances user_utterance = user_utterance + utterance + ' ' if not user_utterance: return [] value_candidates = [] user_utterance = user_utterance.replace("'", "") # use corenlp client and annotate text annotation = self.client.annotate(user_utterance) date_day = '' for sent in annotation.sentence: prev_word = '' for token in sent.token: word = token.word.strip() # extract Adjectives & Adverbs using POS tags # exclude the custom entity types if token.pos in ['JJ', 'RB'] \ and token.ner not in ['AREA', 'PLACE', 'PLACE_TYPE', 'FOOD_TYPE']: # check if the word ends with 'ly' word = word.removesuffix('ly') if word not in self.stopwords and word not in value_candidates: if prev_word == 'not' and prev_word + ' ' + word in VALUES_CONVERT: word = VALUES_CONVERT[prev_word + ' ' + word] value_candidates.append(word) prev_word = word # extract day, time & numbers if token.ner in DATE_TIME_NUM: if word in VALUES_CONVERT: word = VALUES_CONVERT[word] if token.ner == "DATE" and token.pos not in ["NNP", "CD"]: continue if token.ner == "DATE" and token.pos == "NNP": date_day = word continue if word not in self.stopwords: if token.ner == "TIME": if not word[0].isdigit(): continue if len(word) == 4: word = word.zfill(5) if word in value_candidates: continue value_candidates.append(word) # add day/date to value candidates if date_day: value_candidates.append(date_day) entity_map = {} # extract named entities (place, area,...) for sent in annotation.sentence: for mention in sent.mentions: entity_text = mention.entityMentionText.strip() if mention.entityType not in {'TIME', 'DATE', 'NUMBER', 'DURATION'} \ and entity_text not in value_candidates: if mention.entityType in ['AREA', 'FOOD_TYPE', 'PLACE_TYPE']: entity_map[mention.entityType] = entity_text else: value_candidates.append(entity_text) more_values = list(entity_map.values()) return value_candidates + more_values