-
Notifications
You must be signed in to change notification settings - Fork 15
/
gp_learner.py
1940 lines (1717 loc) · 70.1 KB
/
gp_learner.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
641
642
643
644
645
646
647
648
649
650
651
652
653
654
655
656
657
658
659
660
661
662
663
664
665
666
667
668
669
670
671
672
673
674
675
676
677
678
679
680
681
682
683
684
685
686
687
688
689
690
691
692
693
694
695
696
697
698
699
700
701
702
703
704
705
706
707
708
709
710
711
712
713
714
715
716
717
718
719
720
721
722
723
724
725
726
727
728
729
730
731
732
733
734
735
736
737
738
739
740
741
742
743
744
745
746
747
748
749
750
751
752
753
754
755
756
757
758
759
760
761
762
763
764
765
766
767
768
769
770
771
772
773
774
775
776
777
778
779
780
781
782
783
784
785
786
787
788
789
790
791
792
793
794
795
796
797
798
799
800
801
802
803
804
805
806
807
808
809
810
811
812
813
814
815
816
817
818
819
820
821
822
823
824
825
826
827
828
829
830
831
832
833
834
835
836
837
838
839
840
841
842
843
844
845
846
847
848
849
850
851
852
853
854
855
856
857
858
859
860
861
862
863
864
865
866
867
868
869
870
871
872
873
874
875
876
877
878
879
880
881
882
883
884
885
886
887
888
889
890
891
892
893
894
895
896
897
898
899
900
901
902
903
904
905
906
907
908
909
910
911
912
913
914
915
916
917
918
919
920
921
922
923
924
925
926
927
928
929
930
931
932
933
934
935
936
937
938
939
940
941
942
943
944
945
946
947
948
949
950
951
952
953
954
955
956
957
958
959
960
961
962
963
964
965
966
967
968
969
970
971
972
973
974
975
976
977
978
979
980
981
982
983
984
985
986
987
988
989
990
991
992
993
994
995
996
997
998
999
1000
#!/usr/bin/env python2.7
# encoding: utf-8
from __future__ import absolute_import
from __future__ import division
from __future__ import print_function
import sys
if __name__ == '__main__':
print(
"You probably wanted to execute run.py instead of this.",
file=sys.stderr
)
sys.exit(1)
from collections import Counter
from collections import defaultdict
from collections import OrderedDict
from functools import partial
from operator import attrgetter
import random
import signal
from time import sleep
from datetime import datetime
from cachetools.func import lru_cache
import deap
import deap.base
import deap.tools
import numpy as np
from rdflib import BNode
from rdflib import Literal
from rdflib import URIRef
from rdflib import Variable
from rdflib import XSD
import SPARQLWrapper
from scoop.futures import map as parallel_map
import six
import logging
logger = logging.getLogger(__name__)
import logging_config
from cluster import expected_precision_loss_by_query_reduction
from cluster import cluster_gps_to_reduce_queries
import config
from exception import GPLearnerAbortException
from fusion import fuse_prediction_results
from fusion import train_fusion_models
from gp_query import ask_multi_query
from gp_query import calibrate_query_timeout
from gp_query import combined_ask_count_multi_query
from gp_query import predict_multi_query
from gp_query import predict_query
from gp_query import query_stats
from gp_query import query_time_hard_exceeded
from gp_query import query_time_soft_exceeded
from gp_query import variable_substitution_query
from graph_pattern import canonicalize
from graph_pattern import gen_random_var
from graph_pattern import GPFitness
from graph_pattern import GPFitnessTuple
from graph_pattern import GraphPattern
from graph_pattern import GraphPatternStats
from graph_pattern import replace_vars_with_random_vars
from graph_pattern import SOURCE_VAR
from graph_pattern import TARGET_VAR
from ground_truth_tools import get_semantic_associations
from ground_truth_tools import k_fold_cross_validation
from ground_truth_tools import split_training_test_set
from gtp_scores import GTPScores
from memory_usage import log_mem_usage
from serialization import find_last_result
from serialization import load_predicted_target_candidates
from serialization import save_predicted_target_candidates
from serialization import find_run_result
from serialization import format_graph_pattern
from serialization import load_init_patterns
from serialization import load_results
from serialization import pause_if_signaled_by_file
from serialization import print_graph_pattern
from serialization import print_population
from serialization import print_results
from serialization import remove_old_result_files
from serialization import save_generation
from serialization import save_results
from serialization import save_run
from serialization import set_symlink
from utils import exception_stack_catcher
from utils import kv_str
from utils import log_all_exceptions
from utils import log_wrapped_exception
from utils import sample_from_list
logger.info('init gp_learner')
signal.signal(signal.SIGUSR1, log_mem_usage)
def init_workers():
parallel_map(_init_workers, range(1000))
def _init_workers(_):
# dummy method that makes workers load all import and config
pass
def f_measure(precision, recall, beta=config.F_MEASURE_BETA):
"""Calculates the f1-measure from precision and recall."""
if precision + recall <= 0:
return 0.
beta_sq = beta ** 2
return (1 + beta_sq) * precision * recall / (beta_sq * precision + recall)
@exception_stack_catcher
def evaluate(sparql, timeout, gtp_scores, graph_pattern, run=0, gen=0):
assert isinstance(graph_pattern, GraphPattern)
assert isinstance(gtp_scores, GTPScores)
ground_truth_pairs = gtp_scores.ground_truth_pairs
remaining_gain = gtp_scores.remaining_gain
gtp_max_precisions = gtp_scores.gtp_max_precisions
complete_pattern = 1 if graph_pattern.complete() else 0
pattern_length = len(graph_pattern)
vars_in_graph_pattern = graph_pattern.vars_in_graph
pattern_vars = len(vars_in_graph_pattern)
logger.debug('evaluating %s', graph_pattern)
# check how many gt_matches (recall) and targets (precision) can be reached
# there are several cases here:
# - ?source & ?target in pattern:
# we can use a combined_ask_count_multi_query to only use one query and
# get gt_matches and res_lengths at once
# - only ?source in pattern
# we can check how many gt_matches (recall) there are with ask_multi_query
# but as we don't have a ?target variable we can't ever reach a target
# with this pattern, meaning we have res_lengths = [] --> precision = 0
# - only ?target in pattern
# we can check how many gt_matches (recall) there are with ask_multi_query
# we could still get res_lengths with a count_query, which might be useful
# for universal targets? 'select count(?target) as ?count' but it turns
# out this is bad idea:
# - it gives incomplete ?target var patterns an advantage against
# ?source only patterns (which will always have a precision of 0)
# - the precision of a ?target var only pattern is preferring very narrow
# patterns (one could say over-fitting) which are only tailored towards
# singling out one specific ?target variable as they can't rely on a
# ?source to actually help with the filtering. Example:
# (dbpedia-de:Pferd owl:sameAs ?target) (only returns dbpedia:Horse)
# - none of the above in pattern: should never happen
if complete_pattern:
query_time, (gt_matches, res_lengths) = combined_ask_count_multi_query(
sparql, timeout, graph_pattern, ground_truth_pairs)
else:
# run an ask_multi_query to see how many gt_matches (recall) we have
query_time, gt_matches = ask_multi_query(
sparql, timeout, graph_pattern, ground_truth_pairs)
res_lengths = {gtp: 0 for gtp in ground_truth_pairs}
matching_node_pairs = [
gtp for gtp in ground_truth_pairs if gt_matches[gtp]
]
# TODO: maybe punish patterns which produce results but don't match?
# judging query times:
# We can't trust counts returned by timed out queries. Nevertheless, we want
# to give soft-timeout queries a slight advantage over complete failures.
# For this we use the f_measure component which still gets a severe
# punishment below, while the individual counts and via that gain and score
# are ignored in case of a timeout.
qtime_exceeded = 0
if query_time_hard_exceeded(query_time, timeout):
qtime_exceeded = 1
elif query_time_soft_exceeded(query_time, timeout):
qtime_exceeded = .5
trust = (1 - qtime_exceeded)
sum_gt_matches = sum(gt_matches.values())
recall = sum_gt_matches / len(ground_truth_pairs)
non_zero_res_lens = [l for l in res_lengths.values() if l > 0]
avg_res_len = sum(non_zero_res_lens) / max(len(non_zero_res_lens), 1)
precision = 0
if avg_res_len > 0:
precision = 1 / avg_res_len
fm = f_measure(precision, recall) * trust
gtp_precisions = OrderedDict()
gain = 0
if qtime_exceeded == 0:
for gtp in matching_node_pairs:
gtp_res_len = res_lengths[gtp]
if gtp_res_len > 0:
gtp_precision = 1 / gtp_res_len
gtp_precisions[gtp] = gtp_precision
score_diff = gtp_precision - gtp_max_precisions[gtp]
if score_diff > 0:
gain += score_diff
# counter overfitting
overfitting = 1
if matching_node_pairs:
m_sources, m_targets = zip(*matching_node_pairs)
if len(set(m_sources)) < 2:
overfitting *= config.OVERFITTING_PUNISHMENT
if len(set(m_targets)) < 2:
overfitting *= config.OVERFITTING_PUNISHMENT
score = trust * overfitting * gain
# order of res needs to fit to graph_pattern.GPFitness
res = (
remaining_gain,
score,
gain,
fm,
avg_res_len,
sum_gt_matches,
pattern_length,
pattern_vars,
qtime_exceeded,
query_time,
)
logger.log(
config.LOGLVL_EVAL,
'Run %d, Generation %d: evaluated fitness for %s%s\n%s',
run, gen,
graph_pattern,
GPFitness(res).format_fitness(),
graph_pattern.fitness.description
)
return res, matching_node_pairs, gtp_precisions
def update_individuals(individuals, eval_results):
"""Updates the given individuals with the eval_results in-place.
:param individuals: A list of individuals (GraphPatterns).
:param eval_results: a list of results calculated by evaluate(), in the
order of individuals.
:return: None
"""
for ind, res in zip(individuals, eval_results):
ind.fitness.values = res[0]
ind.matching_node_pairs = res[1]
ind.gtp_precisions = res[2]
@lru_cache(maxsize=config.CACHE_SIZE_FIT_TO_LIVE)
def fit_to_live(child):
if 1 > len(child) > config.MAX_PATTERN_LENGTH:
return False
if not child.vars_in_graph & {SOURCE_VAR, TARGET_VAR}:
return False
if len(child.vars_in_graph) > config.MAX_PATTERN_VARS:
return False
if len(child.to_sparql_select_query()) > config.MAX_PATTERN_QUERY_SIZE:
return False
if any([
len(o.n3()) > config.MAX_LITERAL_SIZE
for _, _, o in child if isinstance(o, Literal)
]):
return False
if any([
isinstance(s, Literal) or isinstance(p, (BNode, Literal))
for s, p, _ in child
]):
return False
if not child.is_connected(via_edges=config.PATTERN_P_CONNECTED):
return False
return True
def mate_helper(
overlap,
delta_dom,
delta_other,
pb_overlap,
pb_dom,
pb_other,
pb_rename_delta_vars,
retries,
):
assert isinstance(overlap, set)
assert isinstance(delta_dom, set)
assert isinstance(delta_other, set)
for _ in range(retries):
overlap_part = [t for t in overlap if random.random() < pb_overlap]
dom_part = [t for t in delta_dom if random.random() < pb_dom]
other_part = [t for t in delta_other if random.random() < pb_other]
if random.random() < pb_rename_delta_vars:
other_part = replace_vars_with_random_vars(other_part)
child = canonicalize(GraphPattern(overlap_part + dom_part + other_part))
if fit_to_live(child):
return child
else:
# most likely not connected, try connecting by merging vars nodes
child = canonicalize(mutate_merge_var(child))
if fit_to_live(child):
return child
return None
def mate(
individual1,
individual2,
pb_overlap=config.CXPB_BP,
pb_dominant_parent=config.CXPB_DP,
pb_other_parent=config.CXPB_OP,
pb_rename_delta_vars=config.CXPB_RV,
retries=config.CX_RETRY,
):
# mate patterns:
# we return 2 children:
# child1 who's dominant parent is individual1
# child2 who's dominant parent is individual2
# both children draw triples from the overlap with pb_overlap.
# child1 draws each triple of individual1 with prob pb_dominant_parent and
# each triple of individual2 with prob pb_other_parent.
# child2 swaps the probabilities accordingly.
# the drawings are repeated retries times. If no fit_to_live child is found
# the dominant parent is returned
assert fit_to_live(individual1), 'unfit indiv in mating %r' % (individual1,)
assert fit_to_live(individual2), 'unfit indiv in mating %r' % (individual2,)
overlap = set(individual1) & set(individual2)
delta1 = set(individual1) - overlap
delta2 = set(individual2) - overlap
child1 = mate_helper(
overlap, delta1, delta2,
pb_overlap, pb_dominant_parent, pb_other_parent, pb_rename_delta_vars,
retries,
) or individual1
child2 = mate_helper(
overlap, delta2, delta1,
pb_overlap, pb_dominant_parent, pb_other_parent, pb_rename_delta_vars,
retries
) or individual2
assert fit_to_live(child1), 'mating %r and %r produced unfit child %r' % (
individual1, individual2, child1
)
assert fit_to_live(child2), 'mating %r and %r produced unfit child %r' % (
individual1, individual2, child2
)
return child1, child2
def mutate_introduce_var(child):
identifiers = tuple(child.identifier_counts(exclude_vars=True))
if not identifiers:
return child
identifier = random.choice(identifiers)
rand_var = gen_random_var()
return GraphPattern(child, mapping={identifier: rand_var})
def mutate_split_var(child):
# count triples that each var occurs in (not occurrences: (?s ?p ?s))
var_trip_count = Counter([
ti for t in child for ti in set(t) if isinstance(ti, Variable)
# if ti not in (SOURCE_VAR, TARGET_VAR) # why not allow them to split?
])
# select vars that occur multiple times
var_trip_count = Counter({i: c for i, c in var_trip_count.items() if c > 1})
if not var_trip_count:
return child
var_to_split = random.choice(list(var_trip_count.elements()))
triples_with_var = [t for t in child if var_to_split in t]
triples = [t for t in child if var_to_split not in t]
# randomly split triples_with_var into 2 non-zero length parts:
# the first part where var_to_split is substituted and the 2nd where not
random.shuffle(triples_with_var)
split_idx = random.randrange(1, len(triples_with_var))
triples += triples_with_var[split_idx:]
rand_var = gen_random_var()
triples += [
tuple([rand_var if ti == var_to_split else ti for ti in t])
for t in triples_with_var[:split_idx]
]
gp = GraphPattern(triples)
if not fit_to_live(gp):
# can happen that we created a disconnected pattern:
# orig:
# ?s ?p ?X, ?X ?q ?Y, ?Y ?r ?t
# splitvar X:
# ?s ?p ?Z, ?X ?q ?Y, ?Y ?r ?t
# try merging once, might lead to this:
# ?s ?p ?Y, ?X ?q ?Y, ?Y ?r ?t
gp = mutate_merge_var(gp)
if not fit_to_live(gp):
return child
return gp
def mutate_merge_var(child, pb_mv_mix=config.MUTPB_MV_MIX):
if random.random() < pb_mv_mix:
return mutate_merge_var_mix(child)
else:
return mutate_merge_var_sep(child)
def _mutate_merge_var_helper(vars_):
rand_vars = vars_ - {SOURCE_VAR, TARGET_VAR}
merge_able_vars = len(rand_vars) - 1
if len(vars_) > len(rand_vars):
# either SOURCE_VAR or TARGET_VAR is also available as merge target
merge_able_vars += 1
merge_able_vars = max(0, merge_able_vars)
return rand_vars, merge_able_vars
def mutate_merge_var_mix(child):
"""Merges two variables into one, potentially merging node and edge vars."""
vars_ = child.vars_in_graph
rand_vars, merge_able_vars = _mutate_merge_var_helper(vars_)
if merge_able_vars < 1:
return child
# merge vars, even mixing nodes and edges
var_to_replace = random.choice(list(rand_vars))
var_to_merge_into = random.choice(list(vars_ - {var_to_replace}))
return GraphPattern(child, mapping={var_to_replace: var_to_merge_into})
def mutate_merge_var_sep(child):
"""Merges two variables into one, won't merge node and edge vars.
Considers the node variables and edge variables separately.
Depending on availability either merges 2 node variables or 2 edge variable.
"""
node_vars = {n for n in child.nodes if isinstance(n, Variable)}
rand_node_vars, merge_able_node_vars = _mutate_merge_var_helper(node_vars)
edge_vars = {e for e in child.edges if isinstance(e, Variable)}
rand_edge_vars, merge_able_edge_vars = _mutate_merge_var_helper(edge_vars)
if merge_able_node_vars < 1 and merge_able_edge_vars < 1:
return child
# randomly merge node or predicate vars proportional to their occurrences
r = random.randrange(0, merge_able_node_vars + merge_able_edge_vars)
if r < merge_able_node_vars:
# we're merging node vars
var_to_replace = random.choice(list(rand_node_vars))
var_to_merge_into = random.choice(list(
node_vars - {var_to_replace}))
else:
# we're merging predicate vars
var_to_replace = random.choice(list(rand_edge_vars))
var_to_merge_into = random.choice(list(
edge_vars - {var_to_replace}))
return GraphPattern(child, mapping={var_to_replace: var_to_merge_into})
def mutate_del_triple(child):
l = len(child)
if l < 2:
return child
new_child = GraphPattern(random.sample(child, l - 1))
if not fit_to_live(new_child):
return child
else:
return new_child
def _mutate_expand_node_helper(node, pb_en_out_link=config.MUTPB_EN_OUT_LINK):
"""Adds a new var-only triple to node.
:param pb_en_out_link: Probability to create an outgoing triple.
:return: The new triple, node and var
"""
var_edge = gen_random_var()
var_node = gen_random_var()
if random.random() < pb_en_out_link:
new_triple = (node, var_edge, var_node)
else:
new_triple = (var_node, var_edge, node)
return new_triple, var_node, var_edge
def mutate_expand_node(
child, node=None, pb_en_out_link=config.MUTPB_EN_OUT_LINK):
"""Expands a random node by adding a new var-only triple to it.
Randomly selects a node. Then adds an outgoing or incoming triple with two
new vars to it.
:param child: The GraphPattern to expand a node in.
:param node: If given the node to expand, otherwise
:param pb_en_out_link: Probability to create an outgoing triple.
:return: A child with the added outgoing/incoming triple.
"""
# TODO: can maybe be improved by sparqling
if not node:
nodes = list(child.nodes)
node = random.choice(nodes)
new_triple, _, _ = _mutate_expand_node_helper(node, pb_en_out_link)
return child + (new_triple,)
def mutate_add_edge(child):
"""Adds an edge between 2 randomly selected nodes.
Randomly selects two nodes, then adds a new triple (n1, e, n2), where e is
a new variable.
:return: A child with the added edge.
"""
# TODO: can maybe be improved by sparqling
nodes = list(child.nodes)
if len(nodes) < 2:
return child
node1, node2 = random.sample(nodes, 2)
var_edge = gen_random_var()
new_triple = (node1, var_edge, node2)
return child + (new_triple,)
def mutate_increase_dist(child):
"""Increases the distance between ?source and ?target by one hop.
Randomly adds a var only triple to the ?source or ?target var. Then swaps
the new node with ?source/?target to increase the distance by one hop.
:return: A child with increased distance between ?source and ?target.
"""
if not child.complete():
return child
var_node = gen_random_var()
var_edge = gen_random_var()
old_st = random.choice([SOURCE_VAR, TARGET_VAR])
new_triple = random.choice([
(old_st, var_edge, var_node), # outgoing new triple
(var_node, var_edge, old_st), # incoming new triple
])
new_child = child + (new_triple,)
# replace the old source/target node with the new node and vice-versa to
# move the old node one hop further away from everything else
new_child = new_child.replace({old_st: var_node, var_node: old_st})
return new_child
def mutate_fix_var_filter(item_counts):
"""Filters results of fix var mutation in-place.
Excludes:
- too long literals
- URIs with encoding errors (real world!)
- BNode results (they will not be fixed but stay SPARQL vars)
- NaN or INF literals (Virtuoso bug
https://github.com/openlink/virtuoso-opensource/issues/649 )
"""
assert isinstance(item_counts, Counter)
for i in list(item_counts.keys()):
if isinstance(i, Literal):
i_n3 = i.n3()
if len(i_n3) > config.MAX_LITERAL_SIZE:
logger.debug(
'excluding very long literal %d > %d from mutate_fix_var:\n'
'%s...',
len(i_n3), config.MAX_LITERAL_SIZE, i_n3[:128]
)
del item_counts[i]
elif i.datatype in (XSD['float'], XSD['double']) \
and six.text_type(i).lower() in ('nan', 'inf'):
logger.debug('excluding %s due to Virtuoso Bug', i_n3)
del item_counts[i]
elif isinstance(i, URIRef):
# noinspection PyBroadException
try:
i.n3()
except Exception: # sadly RDFLib doesn't raise a more specific one
# it seems some SPARQL endpoints (Virtuoso) are quite liberal
# during their import process, so it can happen that we're
# served broken URIs, which break when re-inserted into SPARQL
# later by calling URIRef.n3()
logger.warning(
'removed invalid URI from mutate_fix_var:\n%r',
i
)
del item_counts[i]
elif isinstance(i, BNode):
# make sure that BNodes stay variables
logger.info('removed BNode from mutate_fix_var')
del item_counts[i]
else:
logger.warning(
'exlcuding unknown result type from mutate_fix_var:\n%r',
i
)
del item_counts[i]
@exception_stack_catcher
def mutate_fix_var(
sparql,
timeout,
gtp_scores,
child,
gtp_sample_max_n=config.MUTPB_FV_RGTP_SAMPLE_N,
rand_var=None,
sample_max_n=config.MUTPB_FV_SAMPLE_MAXN,
limit=config.MUTPB_FV_QUERY_LIMIT,
):
"""Finds possible fixations for a randomly selected variable of the pattern.
This is the a very important mutation of the gp learner, as it is the main
source of actually gaining information from the SPARQL endpoint.
The outline of the mutation is as follows:
- If not passed in, randomly selects a variable (rand_var) of the pattern
(node or edge var, excluding ?source and ?target).
- Randomly selects a subset of up to gtp_sample_max_n GTPs with
probabilities according to their remaining gains. The number of GTPs
picked is randomized (see below).
- Issues SPARQL queries to find possible fixations for the selected variable
under the previously selected GTPs subset. Counts the fixation's
occurrences wrt. the GTPs and sorts the result descending by these counts.
- Limits the result rows to deal with potential long-tails.
- Filters the resulting rows with mutate_fix_var_filter.
- From the limited, filtered result rows randomly selects up to sample_max_n
candidate fixations with probabilities according to their counts.
- For each candidate fixation returns a child in which rand_var is replaced
with the candidate fixation.
The reasons for fixing rand_var based on a randomly sized subset of GTPs
are efficiency and shadowing problems with common long-tails. Due to the
later imposed limit (which is vital in real world use-cases),
a few remaining GTPs that share more than `limit` potential fixations (so
have a common long-tail) could otherwise hide solutions for other
remaining GTPs. This can be the case if these common fixations have low
fitness. By randomizing the subset size, we will eventually (and more
likely) select other combinations of remaining GTPs.
:param sparql: SPARQLWrapper endpoint.
:param timeout: Timeout in seconds for each individual query (gp).
:param gtp_scores: Current GTPScores object for sampling.
:param child: a graph pattern to mutate.
:param gtp_sample_max_n: Maximum GTPs subset size to base fixations on.
:param rand_var: If given uses this variable instead of a random one.
:param sample_max_n: Maximum number of children.
:param limit: SPARQL limit for the top-k result rows.
:return: A list of children in which the selected variable is substituted
with fixation candidates wrt. GTPs.
"""
assert isinstance(child, GraphPattern)
assert isinstance(gtp_scores, GTPScores)
# The further we get, the less gtps are remaining. Sampling too many (all)
# of them might hurt as common substitutions (> limit ones) which are dead
# ends could cover less common ones that could actually help
gtp_sample_max_n = min(gtp_sample_max_n, int(gtp_scores.remaining_gain))
gtp_sample_max_n = random.randint(1, gtp_sample_max_n)
ground_truth_pairs = gtp_scores.remaining_gain_sample_gtps(
max_n=gtp_sample_max_n)
rand_vars = child.vars_in_graph - {SOURCE_VAR, TARGET_VAR}
if len(rand_vars) < 1:
return [child]
if rand_var is None:
rand_var = random.choice(list(rand_vars))
t, substitution_counts = variable_substitution_query(
sparql, timeout, child, rand_var, ground_truth_pairs, limit)
if not substitution_counts:
# the current pattern is unfit, as we can't find anything fulfilling it
logger.debug("tried to fix a var %s without result:\n%s"
"seems as if the pattern can't be fulfilled!",
rand_var, child)
return [child]
mutate_fix_var_filter(substitution_counts)
if not substitution_counts:
# could have happened that we removed the only possible substitution
return [child]
# randomly pick n of the substitutions with a prob ~ to their counts
items, counts = zip(*substitution_counts.most_common())
substs = sample_from_list(items, counts, sample_max_n)
logger.log(
config.LOGLVL_MUTFV,
'fixed variable %s in %sto:\n %s\n<%d out of:\n%s\n',
rand_var.n3(),
child,
'\n '.join([subst.n3() for subst in substs]),
sample_max_n,
'\n'.join([' %d: %s' % (c, v.n3())
for v, c in substitution_counts.most_common()]),
)
res = [
GraphPattern(child, mapping={rand_var: subst})
for subst in substs
]
return res
def mutate_simplify_pattern(gp):
if len(gp) < 2:
return gp
orig_gp = gp
logger.debug('simplifying pattern\n%s', gp)
# remove parallel variable edges (single variables only)
# e.g., [ :x ?v1 ?y . :x ?v2 ?y. ] should remove :x ?v2 ?y.
identifier_counts = gp.identifier_counts()
edge_vars = [edge for edge in gp.edges if isinstance(edge, Variable)]
# note that we also count occurrences in non-edge positions just to be safe!
edge_var_counts = Counter({v: identifier_counts[v] for v in edge_vars})
edge_vars_once = [var for var, c in edge_var_counts.items() if c == 1]
for var in sorted(edge_vars_once, reverse=True):
var_triple = [(s, p, o) for s, p, o in gp if p == var][0] # only one
s, _, o = var_triple
parallel_triples = [
t for t in gp if (t[0], t[2]) == (s, o) and t[1] != var
]
if parallel_triples:
# remove alpha-num largest var triple
gp -= [var_triple]
# remove edges between fixed nodes (fixed and single var edges from above)
fixed_node_triples = [
(s, p, o) for s, p, o in gp
if not isinstance(s, Variable) and not isinstance(o, Variable)
]
gp -= [
(s, p, o) for s, p, o in fixed_node_triples
if not isinstance(p, Variable) or p in edge_vars_once
]
# remove unrestricting leaf edges (single occurring vars only) and leaves
# behind fixed nodes
# more explicit: such edges are
# - single occurrence edge vars with a single occ gen var node,
# so (x, ?vp, ?vn) or (?vn, ?vp, x)
# - or single occ gen var node behind fixed nodes
old_gp = None
while old_gp != gp:
old_gp = gp
gen_var_counts = gp.var_counts()
for i in (SOURCE_VAR, TARGET_VAR):
# remove all non generated vars from gen_var_counts
del gen_var_counts[i]
for t in gp:
counts = map(lambda x: gen_var_counts[x], t)
s, p, o = t
# get counts for s, p and p, o. if [1, 1] remove triple
if ((
# single occ edge var with single node var
counts[0:2] == [1, 1] or counts[1:3] == [1, 1]
) or (
# single occ gen var node behind fixed node
counts[1] == 0 and (
(not isinstance(s, Variable) and counts[2] == 1) or
(not isinstance(o, Variable) and counts[0] == 1)
)
)):
gp -= [t]
# TODO: parallel edges like
# ?s <p> ?v1 . ?v1 <q> ?t .
# ?s <p> ?v2 . ?v2 <q> ?t .
# TODO: remove fixed edge only connected patterns like
# ?s <p> ?t . <x> <p> ?v1 . ?v1 ?v2 <y> .
# TODO: maybe remove disconnected components (relevance in reality?)
if len(gp) < 1:
# for example: ?s ?v1 ?v2 .
logger.log(
config.LOGLVL_MUTSP,
'simplification of the following pattern resulted in empty pattern,'
' returning original pattern:\n%s',
orig_gp,
)
return orig_gp
if orig_gp == gp:
logger.log(
config.LOGLVL_MUTSP,
'simplification had no effect on pattern:\n%s',
gp,
)
else:
logger.log(
config.LOGLVL_MUTSP,
'successfully simplified pattern:\n%swas simplified to:\n%s',
orig_gp,
gp,
)
return gp
@exception_stack_catcher
def mutate(
sparql,
timeout,
gtp_scores,
child,
pb_ae=config.MUTPB_AE,
pb_dt=config.MUTPB_DT,
pb_en=config.MUTPB_EN,
pb_fv=config.MUTPB_FV,
pb_id=config.MUTPB_ID,
pb_iv=config.MUTPB_IV,
pb_mv=config.MUTPB_MV,
pb_sp=config.MUTPB_SP,
pb_sv=config.MUTPB_SV,
):
# mutate patterns:
# grow: select random identifier and convert them into a var (local)
# grow: select var and randomly split in 2 vars (local)
# shrink: merge 2 vars (local)
# shrink: del triple (local)
# grow: select random node, add edge with / without neighbor (local for now)
# grow: select random 2 nodes, add edge between (local for now)
# grow: increase distance between source and target by moving one a hop away
# shrink: fix variable (SPARQL)
# shrink: simplify pattern
orig_child = child
assert fit_to_live(child), 'mutation on unfit child: %r' % (child,)
if random.random() < pb_iv:
child = mutate_introduce_var(child)
if random.random() < pb_sv:
child = mutate_split_var(child)
if random.random() < pb_mv:
child = mutate_merge_var(child)
if random.random() < pb_dt:
child = mutate_del_triple(child)
if random.random() < pb_en:
child = mutate_expand_node(child)
if random.random() < pb_ae:
child = mutate_add_edge(child)
if random.random() < pb_id:
child = mutate_increase_dist(child)
if random.random() < pb_sp:
child = mutate_simplify_pattern(child)
if random.random() < pb_fv:
child = canonicalize(child)
children = mutate_fix_var(sparql, timeout, gtp_scores, child)
else:
children = [child]
# TODO: deep & narrow paths mutation
children = {
c if fit_to_live(c) else orig_child
for c in children
}
children = {
canonicalize(c) for c in children
}
return list(children)
def train(toolbox, population, run):
hall_of_fame = deap.tools.HallOfFame(config.HOFSIZE)
# pop = toolbox.population(n=50)
pop = population
g = 0
logger.info(
'Run %d, Generation %d: %d individuals',
run, g, len(pop)
)
logger.debug('Population: %r', pop)
# Evaluate the entire population
_evaluate = partial(toolbox.evaluate, run=run, gen=g)
eval_results = list(parallel_map(_evaluate, pop))
logger.info(
'Run %d, Generation %d: evaluated %d individuals',
run, g, len(pop)
)
logger.debug('Evaluation results: %r', eval_results)
update_individuals(pop, eval_results)
hall_of_fame.update(pop)
best_individual = hall_of_fame[0]
best_individual_gen = g
if not toolbox.generation_step_callback(g, pop):
logger.info(
'terminating learning as requested by generation_step_callback'
)
return g, pop, hall_of_fame
# TODO: don't double eval same pattern? maybe a bit redundancy is good?
# TODO: increase timeout if > x % of population fitnesses show timeout
for g in range(1, config.NGEN + 1):
# Select the next generation individuals
offspring = toolbox.select(pop)
logger.info(
"Run %d, Generation %d: selected %d offspring individuals",
run, g, len(offspring)
)
logger.debug('Offspring: %r', offspring)
# Clone the selected individuals
# offspring = map(toolbox.clone, offspring)
# Apply crossover and mutation on the offspring
tmp = []
for child1, child2 in zip(offspring[::2], offspring[1::2]):
if random.random() < config.CXPB:
child1, child2 = toolbox.mate(child1, child2)
tmp.append(child1)
tmp.append(child2)
offspring = tmp
logger.info(
"Run %d, Generation %d: %d individuals after mating",
run, g, len(offspring)
)
logger.debug('Offspring: %r', offspring)
mutants = []
tmp = []
for child in offspring:
if random.random() < config.MUTPB:
mutants.append(child)
else:
tmp.append(child)
offspring = tmp
logger.debug('Offspring in gen %d to mutate: %r', g, mutants)
mutant_children = list(parallel_map(toolbox.mutate, mutants))
logger.debug('Mutation results in gen %d: %r', g, mutant_children)
for mcs in mutant_children:
offspring.extend(mcs)
logger.info(
"Run %d, Generation %d: %d individuals after mutation",
run, g, len(offspring)
)
logger.debug('Offspring: %r', offspring)
# don't completely replace pop, but keep good individuals
# will draw individuals from the first 10 % of the HOF
# CDF: 0: 40 %, 1: 64 %, 2: 78 %, 3: 87 %, 4: 92 %, 5: 95 %, ...
l = len(hall_of_fame)
offspring += [
hall_of_fame[int(random.betavariate(1, 50) * (l - 1))]
for _ in range(config.HOFPAT_REINTRO)
]
# always re-introduce some variable patterns with ?source and ?target
offspring += [
child for child in generate_variable_patterns(config.VARPAT_REINTRO)
if fit_to_live(child)
]
logger.info(
'Run %d, Generation %d: %d individuals after re-adding hall-of-fame'
' and variable patterns',
run, g, len(offspring)
)
logger.debug('Offspring: %r', offspring)
# Evaluate the individuals with an invalid fitness
invalid_ind = [ind for ind in offspring if not ind.fitness.valid]
logger.info(
"Run %d, Generation %d: %d individuals to evaluate",
run, g, len(invalid_ind)
)
logger.debug('Evaluating individuals in gen %d: %r', g, invalid_ind)
_evaluate = partial(toolbox.evaluate, run=run, gen=g)
eval_results = list(parallel_map(_evaluate, invalid_ind))
logger.info(
"Run %d, Generation %d: %d individuals evaluated",
run, g, len(eval_results)
)
logger.debug('Evaluation results in gen %d: %r', g, eval_results)
update_individuals(invalid_ind, eval_results)
hall_of_fame.update(invalid_ind)
# replace population with this generation's offspring
pop[:] = offspring
logger.info(
"Run %d, Generation %d: %d individuals",
run, g, len(pop)
)
logger.debug('Population of generation %d: %r', g, pop)
if not toolbox.generation_step_callback(g, pop):
logger.info(
'terminating learning as requested by generation_step_callback'
)
break
if best_individual.fitness < hall_of_fame[0].fitness:
best_individual = hall_of_fame[0]
best_individual_gen = g
if g >= best_individual_gen + config.NGEN_NO_IMPROVEMENT:
logger.info(
'terminating learning after generation %d: '
'no better individual found since best in generation %d.',
g, best_individual_gen
)