"""
Admissibility Constraints:
An expression to be checked for admissibility should be a DRS which embeds a NewInfoDRS:
The former represents previous discourse (if any), the latter is some new discourse that
has been resolved with respect to that previous discourse. There are two groups of Admissibility
Constraints (van der Sandt 1992):
Global Constraints:
Prover - negative check for consistency.
Prover - negative check for informativity.
Builder - positive check for consistency.
Builder - positive check for informativity.
Global Consistency: Give a NegatedExpression of the resulting discourse to the prover.
If it returns True, the discourse is inconsistent. Give the resulting discourse to
the builder. If it returns a model, the discourse is consistent.
Global Informativity: Negate new discourse. Give a NegatedExpression of the resulting
discourse to the prover. If the prover returns True, then the resulting discourse
is uninformative. Give the resulting discourse to the builder. If it returns a model,
the new reading is informative.
Local Constraints:
Superordinate DRSs should entail neither a DRS nor its negation.
Generate a list of ordered pairs such that the first element of the pair is the DRS
representing the entire discourse but without some subordinate DRS and the second is
this subordinate DRS to be checked on entailment:
(i) From DrtNegatedExpression -K, if K is a DRS, take K and put it in the second
place of the ordered pair. Remove the DrtNegatedExpression and place the rest in
the first place of the order pair.
(ii) From a DrtImpCondition K->L, provided that both are DRSs, create two ordered
pairs: (a) Put the antecedent DRS K into the second place, remove the DrtImpCondition
and put the result into the first place of the ordered pair; (b) Put the consequent
DRS L into the second place, merge the antecedent DRS K globally and put the
result into the first place of the ordered pair.
(iii) From a DrtOrExpression K | L, provided that both are DRSs, create two ordered
pairs: Put each of the disjuncts into the second place of each pair, remove the
DrtORExpression and put the result into the first place of that pair.
"""
__author__ = "Peter Makarov, Alex Kislev, Emma Li"
__version__ = "1.0"
__date__ = "Tue, 24 Aug 2010"
import subprocess
from threading import Thread
from nltk.internals import find_binary
from nltk.sem import Valuation
from nltk.sem.logic import is_indvar
from nltk.sem import drt
from nltk.inference.mace import MaceCommand
from nltk.inference.prover9 import convert_to_prover9
from nltk.sem.logic import AndExpression, NegatedExpression
from .temporaldrt import DRS, DrtExpression, DrtBooleanExpression, DrtNegatedExpression, DrtConstantExpression, \
DrtApplicationExpression, DrtTokens, NewInfoDRS, \
DrtConcatenation, DrtImpExpression, DrtOrExpression, PresuppositionDRS, \
DrtEventualityApplicationExpression
[docs]class Communicator(Thread):
"""a thread communicating with a process, terminates once the communication is over"""
[docs] def __init__(self, process, input=None):
Thread.__init__(self)
self.process = process
self.input = input
[docs] def run(self):
try:
self.result = self.process.communicate(self.input)
except OSError:
pass
@property
def res(self):
return self.process.communicate(self.input)
class Theorem(object):
BINARY_LOCATIONS = ('/usr/local/bin', '/usr/bin')
PROVER_BINARY = None
BUILDER_BINARY = None
INTERPFORMAT_BINARY = None
def __init__(self, prover_goal, builder_goal, prover_timeout=60, builder_max_models=500):
self.prover_goal = prover_goal
self.builder_goal = builder_goal
self.prover_timeout = prover_timeout
self.builder_max_models = builder_max_models
def _find_binary(self, name, verbose=True):
return find_binary(name,
searchpath=Theorem.BINARY_LOCATIONS,
env_vars=['PROVER9HOME'],
url='http://www.cs.unm.edu/~mccune/prover9/',
binary_names=[name],
verbose=verbose)
def _prover9_input(self):
return "clear(auto_denials).\n%s" % self._input(self.prover_goal)
def _mace_input(self):
return self._input(self.builder_goal)
def _input(self, goal):
return "formulas(goals).\n %s.\nend_of_list.\n\n" % convert_to_prover9(goal)
def check(self, run_builder=False, verbose=True):
prover_input = 'assign(max_seconds, %d).\n\n' % self.prover_timeout if self.prover_timeout > 0 else ""
prover_input += self._prover9_input()
builder_input = 'assign(end_size, %d).\n\n' % self.builder_max_models if self.builder_max_models > 0 else ""
builder_input += self._mace_input()
return self._call(prover_input, builder_input, run_builder, verbose)
def _model(self, valuation_str, verbose=True):
"""
Transform the output file into an NLTK-style Valuation.
@return: A model if one is generated; None otherwise.
@rtype: L{nltk.sem.Valuation}
"""
valuation_standard_format = self._transform_output(valuation_str, 'standard', verbose)
val = []
for line in valuation_standard_format.splitlines(False):
l = line.strip()
if l.startswith('interpretation'):
# find the number of entities in the model
num_entities = int(l[l.index('(') + 1:l.index(',')].strip())
elif l.startswith('function') and l.find('_') == -1:
# replace the integer identifier with a corresponding alphabetic character
name = l[l.index('(') + 1:l.index(',')].strip()
if is_indvar(name):
name = name.upper()
value = int(l[l.index('[') + 1:l.index(']')].strip())
val.append((name, MaceCommand._make_model_var(value)))
elif l.startswith('relation'):
l = l[l.index('(') + 1:]
if '(' in l:
#relation is not nullary
name = l[:l.index('(')].strip()
values = [int(v.strip()) for v in l[l.index('[') + 1:l.index(']')].split(',')]
val.append((name, MaceCommand._make_relation_set(num_entities, values)))
else:
#relation is nullary
name = l[:l.index(',')].strip()
value = int(l[l.index('[') + 1:l.index(']')].strip())
val.append((name, value == 1))
return Valuation(val)
def _transform_output(self, input_str, format, verbose=True):
if Theorem.INTERPFORMAT_BINARY is None:
Theorem.INTERPFORMAT_BINARY = self._find_binary('interpformat', verbose)
if verbose:
print('Calling Interpformat:', Theorem.INTERPFORMAT_BINARY)
print('Args:', format)
print('Input:\n', input_str, '\n')
p = subprocess.Popen([Theorem.INTERPFORMAT_BINARY, format], stdout=subprocess.PIPE,
stderr=subprocess.STDOUT,
stdin=subprocess.PIPE)
(stdout, stderr) = p.communicate(input_str)
if verbose:
print('Return code:', p.returncode)
if stdout: print('stdout:\n', stdout, '\n')
if stderr: print('stderr:\n', stderr, '\n')
return stdout
def _call(self, prover_input, builder_input, run_builder, verbose):
if Theorem.PROVER_BINARY is None:
Theorem.PROVER_BINARY = self._find_binary('prover9', verbose)
if Theorem.BUILDER_BINARY is None:
Theorem.BUILDER_BINARY = self._find_binary('mace4', verbose)
if verbose:
print('Calling Prover:', Theorem.PROVER_BINARY)
print('Prover Input:\n', prover_input, '\n')
print('Calling Builder:', Theorem.BUILDER_BINARY)
print('Builder Input:\n', builder_input, '\n')
prover_process = subprocess.Popen([Theorem.PROVER_BINARY], stdout=subprocess.PIPE, stderr=subprocess.PIPE, stdin=subprocess.PIPE)
prover_thread = Communicator(prover_process, prover_input)
prover_thread.start()
if run_builder:
builder_process = subprocess.Popen([Theorem.BUILDER_BINARY], stdout=subprocess.PIPE, stderr=subprocess.PIPE, stdin=subprocess.PIPE)
builder_thread = Communicator(builder_process, builder_input)
builder_thread.start()
while prover_thread.is_alive() and (not run_builder or builder_thread.is_alive()):
pass
#stdout, stderr = prover_thread.result
stdout, stderr = prover_thread.res
returncode = prover_process.poll()
result = not (returncode == 0)
output = None
if not prover_thread.is_alive():
if verbose:
print("Prover done, Builder %s " % ("done" if not builder_thread.is_alive() else "running"))
stdout, stderr = prover_thread.result
returncode = prover_process.poll()
result = not (returncode == 0)
output = None
if run_builder and builder_process.poll() is None:
if verbose:
print("builder is still running, terminating...")
try:
builder_process.terminate()
except OSError:
pass
elif run_builder and not builder_thread.is_alive():
if verbose:
print("Prover %s, Builder done " % ("done" if not prover_thread.is_alive() else "running"))
stdout, stderr = builder_thread.result
returncode = builder_process.poll()
result = (returncode == 0)
output = stdout
if prover_process.poll() is None:
if verbose:
print("prover is still running, terminating...")
try:
prover_process.terminate()
except OSError:
pass
if verbose:
if stdout: print(('output:\t%s' % stdout))
if stderr: print(('error:\t%s' % stderr))
print('return code:', returncode)
# transform the model if one is available
if output is not None:
output = self._model(stdout, verbose)
return (result, output)
[docs]def inference_check(expr, background_knowledge=False, verbose=True):
"""General function for all kinds of inference-based checks:
consistency, global and local informativity"""
assert isinstance(expr, (DRS, drt.DRS)), "Expression %s is not a DRS"
expression = expr.deepcopy()
if verbose:
print("\n##### Inference check initiated #####\n\nExpression:\t%s\n" % expression)
def _remove_temporal_conds(e):
"""Removes discourse structuring temporal conditions that could
affect inference check"""
for cond in list(e.conds):
if isinstance(cond, DrtEventualityApplicationExpression) and \
isinstance(cond.function, DrtEventualityApplicationExpression) and \
cond.function.function.variable.name in DrtTokens.TEMP_CONDS:
e.conds.remove(cond)
elif isinstance(cond, DRS):
_remove_temporal_conds(cond)
elif isinstance(cond, DrtNegatedExpression) and \
isinstance(cond.term, DRS):
_remove_temporal_conds(cond.term)
elif isinstance(cond, DrtBooleanExpression) and \
isinstance(cond.first, DRS) and isinstance(cond.second, DRS):
_remove_temporal_conds(cond.first)
_remove_temporal_conds(cond.second)
def _check(expression):
"""method performing check"""
if background_knowledge:
e = AndExpression(expression.fol(), background_knowledge)
if verbose:
print("performing check on: %s" % e)
t = Theorem(NegatedExpression(e), e)
else:
if verbose:
print("performing check on: %s" % expression.fol())
t = Theorem(NegatedExpression(expression), expression)
result, output = t.check()
if verbose:
if output:
print("\nMace4 returns:\n%s\n" % output)
else:
print("\nProver9 returns: %s\n" % (not result))
return result
def consistency_check(expression):
"""1. Consistency check"""
if verbose:
print("### Consistency check initiated...\n")
if not _check(expression):
error_message = "New discourse is inconsistent on the following interpretation:\n\n%s" % expression
if verbose:
print("#!!!#: %s" % error_message)
return ConsistencyError(error_message)
else:
if verbose:
print("##OK##: Expression is consistent\n")
return True
def informativity_check(expression):
"""2. Global informativity check"""
if verbose:
print("### Informativity check initiated...\n")
local_check = []
for cond in reversed(expression.conds):
if isinstance(cond, DRS) and \
not isinstance(cond, PresuppositionDRS):
#New discourse in the previous discourse
temp = (expression.conds[:expression.conds.index(cond)] +
[DrtNegatedExpression(cond)] +
expression.conds[expression.conds.index(cond) + 1:])
e = expression.__class__(expression.refs, temp)
if verbose:
print("new discourse %s found in %s \n" % (cond, expression))
print("expression for global check: %s \n" % e)
if not _check(e):
#new discourse is uninformative
error_message = ("New expression is uninformative on the following interpretation:\n\n%s"
% expression)
if verbose:
print("#!!!# %s" % error_message)
return InformativityError(error_message)
else:
temp = (expression.conds[:expression.conds.index(cond)] +
expression.conds[expression.conds.index(cond) + 1:])
#Put sub-DRS into main DRS and start informativity check
return informativity_check(expression.__class__(expression.refs + cond.refs, temp + cond.conds))
#generates tuples for local admissibility check
elif isinstance(cond, DrtOrExpression) and \
isinstance(cond.first, DRS) and isinstance(cond.second, DRS):
temp = (expression.conds[:expression.conds.index(cond)] +
expression.conds[expression.conds.index(cond) + 1:])
local_check.append((expression.__class__(expression.refs, temp), cond.first))
local_check.append((expression.__class__(expression.refs, temp), cond.second))
elif isinstance(cond, DrtImpExpression) and \
isinstance(cond.first, DRS) and isinstance(cond.second, DRS):
temp = (expression.conds[:expression.conds.index(cond)] +
expression.conds[expression.conds.index(cond) + 1:])
local_check.append((expression.__class__(expression.refs, temp), cond.first))
local_check.append((DrtConcatenation(expression.__class__(expression.refs, temp),
cond.first).simplify(), cond.second))
if not local_check == []:
return local_informativity_check(local_check)
else:
if verbose:
print("##OK##: Expression is informative\n")
return True
def local_informativity_check(check_list):
"""3. Local admissibility constraints"""
if verbose: print("### Local admissibility check initiated...\n%s\n" % check_list)
for main, sub in check_list:
assert isinstance(main, (DRS, drt.DRS)), "Expression %s is not a DRS"
assert isinstance(sub, (DRS, drt.DRS)), "Expression %s is not a DRS"
if not _check(main.__class__(main.refs, main.conds + [DrtNegatedExpression(sub)])):
error_message = "New discourse is inadmissible due to local uninformativity:\n\n%s entails %s" % (main, sub)
if verbose:
print("#!!!#: ", error_message)
return AdmissibilityError(error_message)
elif not _check(main.__class__(main.refs, main.conds + [sub])):
error_message = "New discourse is inadmissible due to local uninformativity:\n\n%s entails the negation of %s" % (main, sub)
if verbose:
print("#!!!#: ", error_message)
return AdmissibilityError(error_message)
if verbose: print("##OK##: Main %s does not entail sub %s nor its negation\n" % (main, sub))
return True
_remove_temporal_conds(expression)
if verbose: print("Expression without eventuality-relating conditions: %s \n" % expression)
cons_check = consistency_check(expression)
if cons_check is True:
inf_check = informativity_check(expression)
if inf_check is True:
for cond in expr.conds:
#Merge DRS of the new expression into the previous discourse
result = expr
if isinstance(cond, NewInfoDRS):
#change to NewInfoDRS when done
result = expr.__class__(expr.refs + cond.refs,
(expression.conds[:expr.conds.index(cond)] +
expression.conds[expr.conds.index(cond) + 1:]) + cond.conds)
if verbose:
print("\n#### Inference check passed ####\n")
return result, "Sentence admitted"
else:
if verbose:
print("\n###!!!# Inference check failed #!!!###\n")
return False, inf_check
else:
if verbose:
print("\n###!!!# Inference check failed #!!!###\n")
return False, cons_check
[docs]class AdmissibilityError(str):
pass
[docs]class ConsistencyError(str):
pass
[docs]def get_bk(drs, dictionary):
"""Collects background knowledge relevant for a given expression.
DrtConstantExpression variable names are used as keys"""
assert isinstance(drs, (DRS, drt.DRS)), "Expression %s is not a DRS" % drs
assert isinstance(dictionary, dict), "%s is not a dictionary" % dictionary
bk_list = []
for cond in drs.conds:
if isinstance(cond, DrtApplicationExpression):
if isinstance(cond.function, DrtConstantExpression):
bk_formula = dictionary.get(cond.function.variable.name, False)
elif isinstance(cond.function, DrtApplicationExpression) and \
isinstance(cond.function.function, DrtConstantExpression):
bk_formula = dictionary.get(cond.function.function.variable.name, False)
if bk_formula:
bk_list.append(bk_formula)
elif isinstance(cond, DRS):
bk_list.extend(get_bk(cond, dictionary))
elif isinstance(cond, DrtNegatedExpression) and \
isinstance(cond.term, DRS):
bk_list.extend(get_bk(cond.term, dictionary))
elif isinstance(cond, DrtBooleanExpression) and \
isinstance(cond.first, DRS) and isinstance(cond.second, DRS):
bk_list.extend(get_bk(cond.first, dictionary))
bk_list.extend(get_bk(cond.second, dictionary))
return list(set(bk_list))
#ontology
ONTOLOGY = {
'individual' : r'all x.((individual(x) -> -(eventuality(x) | time(x))) & (eventuality(t) -> -time(x)) & (state(x) -> (eventuality(x) & -event(x))) & (event(x) -> eventuality(x)))',
'time' : r'all x.((individual(x) -> -(eventuality(x) | time(x))) & (eventuality(t) -> -time(x)) & (state(x) -> (eventuality(x) & -event(x))) & (event(x) -> eventuality(x)))',
'state' : r'all x.((individual(x) -> -(eventuality(x) | time(x))) & (eventuality(t) -> -time(x)) & (state(x) -> (eventuality(x) & -event(x))) & (event(x) -> eventuality(x)))',
'event' : r'all x.((individual(x) -> -(eventuality(x) | time(x))) & (eventuality(t) -> -time(x)) & (state(x) -> (eventuality(x) & -event(x))) & (event(x) -> eventuality(x)))'
}