#!/usr/bin/env python
# -*- coding: utf-8 -*-
import logging
kblogger = logging.getLogger("kb")
DEBUG_LEVEL = logging.WARN
import sys
from errno import ECONNREFUSED, EWOULDBLOCK, EAGAIN
import threading, asyncore
import asynchat
import socket
import time
import random
import shlex
import json
import traceback
try:
from Queue import Queue, Empty
except ImportError: #Python3 compat
from queue import Queue, Empty
EVENT_POLLING_RATE = 20 #Hz
DEFAULT_PORT = 6969
[docs]class NullHandler(logging.Handler):
"""Defines a NullHandler for logging, in case kb is used in an application
that doesn't use logging.
"""
[docs] def emit(self, record):
pass
kblogger = logging.getLogger("kb")
h = NullHandler()
kblogger.addHandler(h)
[docs]class KbError(Exception):
[docs] def __init__(self, value):
self.value = value
[docs] def __str__(self):
return repr(self.value)
[docs]class EventCallbackExecutor(threading.Thread):
[docs] def __init__(self, in_event_queue, out_polled_event_queue, callback_queue):
threading.Thread.__init__(self)
self.running = True
self._events = in_event_queue
self._polled_events = out_polled_event_queue
self._callbacks_queue = callback_queue
self._callbacks = {}
[docs] def run(self):
while self.running:
# wait for an event. Can not be blocking else we can not join the thread.
eventid, value = None, None
try:
eventid, value = self._events.get_nowait()
except Empty:
time.sleep(1. / EVENT_POLLING_RATE)
continue
# check if new callbacks have been registered
try:
while True:
newid, cb = self._callbacks_queue.get_nowait()
self._callbacks.setdefault(newid,[]).append(cb)
except Empty:
pass
if eventid not in self._callbacks:
# no callback associated. Put it back to the event queue for manual
# polling by the user
self._polled_events.put((eventid, value))
self._events.task_done()
else:
for cb in self._callbacks[eventid]:
kblogger.debug("Executing callback %s" % cb.__name__)
cb(value)
self._events.task_done()
[docs] def close(self):
# make sure all received events are handled
self._events.join()
self.running = False
self.join()
MSG_SEPARATOR = "#end#"
KB_OK="ok"
KB_ERROR="error"
KB_EVENT="event"
[docs]class KB:
[docs] def __init__(self, host='localhost', port=DEFAULT_PORT, embedded = False, defaultontology = None, sock=None):
#incoming events
self._internal_events = Queue()
# events that are not dealt with a callback
self.events = Queue()
self._callbackexecutor = None
self.embedded = embedded
if not self.embedded:
if not host or not port:
raise KbError("No host and/or port specified to connect to the knowledge base.")
self._channels = {}
self._asyncore_thread = threading.Thread( target = asyncore.loop, kwargs = {'timeout': .1, 'map': self._channels} )
self._client = RemoteKBClient(self._internal_events, self._channels, host, port, sock)
self._asyncore_thread.start()
else:
self._client = EmbeddedKBClient(defaultontology)
#add to the KB class all the methods the server declares
methods = self._client.call_server("methods")
if not methods:
raise KbError("Could not connect to the knowledge base. Is it started?")
for m in methods:
self.add_method(m.split("(")[0])
#new subscribers. The callbackExecutor thread is started only at the
# end of the constructor -> we first want to be sure we were able
# to connect to the knowledge base
self._registered_callbacks = Queue()
self._callbackexecutor = EventCallbackExecutor(self._internal_events, self.events, self._registered_callbacks)
self._callbackexecutor.start()
[docs] def add_method(self, m):
m = str(m) # convert from unicode...
def innermethod(*args, **kwargs):
kblogger.debug("Sending <%s(%s,%s)> request to server." % \
(m,
", ".join([str(a) for a in args]),
", ".join(str(k)+"="+str(v) for k,v in kwargs.items())))
return self._client.call_server(m, *args, **kwargs)
innermethod.__doc__ = "This method is a proxy for the knowledge server %s method." % m
#special cases for the server's methods we want to override
if m == "subscribe": innermethod.__name__ = "server_subscribe"
elif m == "close": innermethod.__name__ = "server_close"
else: innermethod.__name__ = m
setattr(self,innermethod.__name__,innermethod)
#### with statement ####
[docs] def __enter__(self):
return self
[docs] def __exit__(self, exc_type, exc_val, exc_tb):
self.close()
[docs] def __del__(self):
self.close()
[docs] def close(self):
if self._callbackexecutor:
self._callbackexecutor.close()
try:
self.server_close() # call the KB close() method. This will also close the RemoteKBClient channel if needed
except AttributeError:
# the connection is likely not yet established, so we did not create
# proxies for remote methods.
pass
if not self.embedded:
self._asyncore_thread.join()
else:
self._client.close()
[docs] def subscribe(self, pattern, callback = None, var = None, type = 'NEW_INSTANCE', trigger = 'ON_TRUE', models = None):
""" Allows to subscribe to an event, and get notified when the event is
triggered.
Example with callbacks:
>>> def onevent(evt):
>>> print("In callback. Got evt %s" % evt)
>>>
>>> self.kb.subscribe(["?o isIn room"], onevent)
>>> self.kb += ["alfred isIn room"]
>>> # 'onevent' get called
In callback. Got evt [u'alfred']
Example with 'polled' events:
>>> evt_id = self.kb.subscribe(["?o isIn room"])
>>> self.kb += ["alfred isIn room"]
>>> print(str(self.kb.events.get()))
('evt_7694742461071211105', [u'alfred'])
If 'callback' is provided, the callback will be invoked with the result of
the event (content depend on event type) *in a separate thread*.
If not callback is provided, the incoming events are stored in the
KB.events queue, and you can poll them yourself (which allow for better
control of the execution flow).
The 'var' parameter can be used with the 'NEW_INSTANCE' type of event to
tell which variable must be returned.
The 'models' parameter allows for registering an event in a specific list
of models. By default, the pattern is monitored on every models.
Returns the event id of the newly created event.
"""
if isinstance(pattern, basestring):
pattern = [pattern]
if var and not var.startswith('?'):
var = '?' + var
if type == 'NEW_INSTANCE' and not var:
#Look if there's more than one variable in the pattern
vars = set()
for ps in pattern:
vars |= set([s for s in shlex.split(ps) if s[0] == '?'])
if len(vars) > 1:
raise AttributeError("You must specify which variable must be returned " + \
"when the event is triggered by setting the 'var' parameter")
if len(vars) == 1:
var = vars.pop()
event_id = self.server_subscribe(type, trigger, var, pattern, models)
kblogger.debug("New event successfully registered with ID " + event_id)
if callback:
self._registered_callbacks.put((event_id, callback))
return event_id
[docs] def __getitem__(self, *args):
"""This method introduces a different way of querying the ontology server.
It uses the args (be it a string or a set of strings) to find concepts
that match the pattern.
An optional 'models' parameter can be given to specify the list of models the
query is executed on.
Depending on the argument, 4 differents behaviours are possible:
- with a string that can not be lexically split into 3 tokens (ie, a string
that do not look like a ``s p o`` tuple), a lookup is performed, and matching
resource are returned
- with a single ``s p o`` pattern:
- if only one of s, p, o is an unbound variable, returns the list of resources
matching this pattern.
- if 2 or 3 of the tokens are unbound variables (like ``kb["* * *"]``
or ``kb["* rdf:type *"]``), a list of statements matching the pattern
is returned.
- with a list of patterns, a list of dictionaries is returned with
possible combination of values for the different variables. For
instance, ``kb[["?agent desires ?action", "?action rdf:type Jump"]]``
would return something like: ``[{"agent":"james", "action":
"jumpHigh"}, {"agent": "laurel", "action":"jumpHigher"}]``
Attention: if more than one argument is passed, and if the last
argument is a list, this list is used as the set of models to execute
the query on. If not such list is provided, the query is executed on
all models.
Use example:
.. code:: python
import kb
kb = KB()
for agent in kb["* rdf:type Agent"]:
#...
if kb["* livesIn ?house", "?house isIn toulouse", ['GERALD']]:
#...
#Assuming 'toulouse' has label "ville rose":
city_id = kb["ville rose"]
"""
args = args[0]
# First, take care of models
models = None
if len(args) > 1 and isinstance(args[-1], list):
models = args[-1]
args = args[:-1]
def get_vars(s):
return [v for v in s if v.startswith('?')]
# Single argument
if isinstance(args, (str, unicode)) or len(args) == 1:
pattern = args[0] if isinstance(args, list) else args
toks = shlex.split(pattern)
if len(toks) == 3:
pattern = self._replacestar(toks)
vars = get_vars(pattern)
return self.find(vars, ["%s %s %s" % pattern], None, models)
else:
lookup = self.lookup(pattern, models)
return [concept[0] for concept in lookup]
# List of patterns
else:
patterns = [self._replacestar(shlex.split(p)) for p in args]
allvars = set()
for p in patterns:
allvars |= set(get_vars(p))
return self.find(list(allvars), ["%s %s %s"%p for p in patterns], None, models)
[docs] def __contains__(self, pattern):
""" This will return 'True' is either a concept - described by its ID or
label- or a statement or a set of statement is present (or can be infered)
in the ontology.
This allows syntax like:
.. code:: python
if 'Toto' in kb:
#...
if 'toto sees tata' in kb:
#...
"""
toks = shlex.split(pattern)
if len(toks) == 3:
pattern = self._replacestar(toks)
return self.exist(["%s %s %s" % pattern])
else:
return True if self.lookup(pattern) else False
[docs] def __iadd__(self, stmts):
""" This method allows to easily add new statements to the ontology
with the ``+=`` operator.
It can only add statement to the default robot's model (other agents' model are
not accessible).
.. code:: python
kb = KB(<host>, <port>)
kb += "toto likes icecream"
kb += ["toto loves tata", "tata rdf:type Robot"]
"""
if not (type(stmts) == list):
stmts = [stmts]
self.update(stmts)
return self
[docs] def __isub__(self, stmts):
""" This method allows to easily retract statements from the ontology
with the ``-=`` operator.
It can only add statement to the robot's model (other agents' model are
not accessible).
If a statement doesn't exist, it is silently skipped.
.. code:: python
kb = KB(<host>, <port>)
kb -= "toto likes icecream"
kb -= ["toto loves tata", "tata rdf:type Robot"]
"""
if not (type(stmts) == list):
stmts = [stmts]
self.retract(stmts)
return self
def _replacestar(self, pattern):
res = []
for tok in pattern:
if tok == '*':
res.append("?" + "".join(random.sample("abcdefghijklmopqrstuvwxyzABCDEFGHIJKLMNOPQRSTUVWXYZ", 5)))
else:
res.append(tok)
return tuple(res)
[docs]class EmbeddedKBClient():
kb = None
kb_thread = None
kb_users = 0
[docs] def __init__(self, defaultontology = None):
try:
from minimalkb.kb import MinimalKB
except ImportError:
raise KbError("Embedded kb required, but MinimalKB can not be imported!")
minimalkblogger = logging.getLogger("minimalKB")
minimalkblogger.addHandler(NullHandler())
kblogger.warn("Using embedded kb: events are not yet supported!")
if not EmbeddedKBClient.kb:
kblogger.info("Initializing the embedded knowledge base.")
self._running = True
EmbeddedKBClient.kb_thread = threading.Thread(target=self.process, kwargs = {"defaultontology":defaultontology})
EmbeddedKBClient.kb_thread.start()
while not hasattr(self, "_kb"):
time.sleep(0.01)
else:
self._kb = EmbeddedKBClient.kb
if defaultontology:
kblogger.warn("The embedded knowledge base has already been " + \
"initialized. I will ignore default ontology <%s>." % defaultontology)
self._incoming_response = Queue()
EmbeddedKBClient.kb_users += 1
[docs] def call_server(self, method, *args, **kwargs):
self._kb.submitrequest(self, method, *args, **kwargs)
# if we are closing, do not wait for an answer
if method == "close":
return None
# Block until a result is available
status, value = self._incoming_response.get()
if status == KB_ERROR:
raise KbError(str(value))
else:
return value
[docs] def sendmsg(self, msg):
self._incoming_response.put(msg)
[docs] def process(self, defaultontology):
from minimalkb.kb import MinimalKB
EmbeddedKBClient.kb = MinimalKB(defaultontology)
self._kb = EmbeddedKBClient.kb
while self._running:
EmbeddedKBClient.kb.process()
[docs] def close(self):
EmbeddedKBClient.kb_users -= 1
if EmbeddedKBClient.kb_users == 0:
kblogger.debug("Last user of the embedded knowledge base has left. " + \
"Closing the knowledge base.")
self._running = False
EmbeddedKBClient.kb.stop_services()
EmbeddedKBClient.kb_thread.join()
EmbeddedKBClient.kb = None # reset kb to none so a new fresh thread may be created if needed.
[docs]class RemoteKBClient(asynchat.async_chat):
use_encoding = 0 # Python2 compat.
[docs] def __init__(self, event_queue, map, host='localhost', port=DEFAULT_PORT, sock=None):
asynchat.async_chat.__init__(self, sock=sock, map=map)
if not sock:
self.create_socket(family=socket.AF_INET, type=socket.SOCK_STREAM)
self.connect( (host, port) )
self.host = host
self.port = port
self.set_terminator(MSG_SEPARATOR)
self._in_buffer = b""
self._incoming_response = Queue()
self._events = event_queue
[docs] def collect_incoming_data(self, data):
self._in_buffer = self._in_buffer + data
[docs] def found_terminator(self):
status, value = self.decode(self._in_buffer)
if status == KB_EVENT:
kblogger.debug("Event received: %s (%s)" % value)
self._events.put(value)
else:
self._incoming_response.put((status, value))
self._in_buffer = b""
[docs] def handle_error(self):
exctype, value = sys.exc_info()[:2]
if exctype == socket.error and value.errno == ECONNREFUSED:
kblogger.error("Connection refused!")
self.handle_close()
return
if exctype == socket.error and value.errno in (EAGAIN, EWOULDBLOCK):
kblogger.warn("Resource not available. Will retry.")
return
kblogger.error("Unhandled exception: %s: %s" % (exctype, value))
traceback.print_exc()
raise value
[docs] def call_server(self, method, *args, **kwargs):
self.push(self.encode(method, *args, **kwargs))
status, value = None, None
while True:
try:
status, value = self._incoming_response.get(True, 0.01)
break
except Empty:
if not self.connected:
# Connection closed!
self.close()
break
if status == KB_ERROR:
raise KbError(value)
else:
return value
[docs] def encode(self, method, *args, **kwargs):
return "\n".join([method] + \
[json.dumps(a) for a in args] + \
([json.dumps({"kwargs":kwargs})] if kwargs else []) + \
[MSG_SEPARATOR])
[docs] def decode(self, raw):
parts = raw.strip().split('\n')
if parts[0] == "ok":
if len(parts) > 1:
return "ok", json.loads(parts[1])
else:
return "ok", None
elif parts[0] == "event":
return "event", (parts[1], json.loads(parts[2]))
elif parts[0] == "error":
return "error", "%s: %s"%(parts[1], parts[2] if len(parts) == 3 else "[no error msg]")
else:
raise KbError("Got an unexpected message status from the knowledge base: %s"%parts[0])
#### patch code from asynchat, ``del deque[0]`` is not safe #####
[docs] def initiate_send(self):
while self.producer_fifo and self.connected:
first = self.producer_fifo.popleft()
# handle empty string/buffer or None entry
if not first:
if first is None:
self.handle_close()
return
# handle classic producer behavior
obs = self.ac_out_buffer_size
try:
data = first[:obs]
except TypeError:
data = first.more()
if data:
self.producer_fifo.appendleft(data)
continue
if isinstance(data, str) and self.use_encoding:
data = bytes(data, self.encoding)
# send the data
try:
num_sent = self.send(data)
except socket.error:
self.handle_error()
return
if num_sent:
if num_sent < len(data) or obs < len(first):
self.producer_fifo.appendleft(first[num_sent:])
# we tried to send some actual data
return
if __name__ == '__main__':
import time
from logging import StreamHandler
console = StreamHandler()
kblogger.setLevel(logging.DEBUG)
formatter = logging.Formatter('%(asctime)-15s: %(message)s')
console.setFormatter(formatter)
kblogger.addHandler(console)
kb = KB()
time.sleep(.1)
print("Closing now...")
kb.close()