1 | import argparse |
2 | import json |
3 | import os |
4 | import pickle |
5 | import time |
6 | from abc import ABC, abstractmethod |
7 | |
8 | from ..base.random_sequence_index import RandomIndexSequence |
9 | from ..constants import BAD_FITNESS |
10 | from ..json_encoders import Encoder |
11 | from ..structures.hall_of_fame import HallOfFame |
12 | from ..structures.individual import Individual |
13 | from ..structures.population import PopulationStructures |
14 | |
15 | |
16 | class ExperimentABC(ABC): |
17 | |
18 | population_structures = None |
19 | hof = [] |
20 | stats = [] |
21 | current_generation = 0 |
22 | time_elapsed = 0 |
23 | |
24 | |
25 | def __init__(self, popsize, hof_size, save_only_best=True) -> None: |
26 | self.hof = HallOfFame(hof_size) |
27 | self.popsize = popsize |
28 | self.save_only_best = save_only_best |
29 | |
30 | def select(self, individuals, tournament_size, random_index_sequence): |
31 | """Tournament selection, returns the index of the best individual from those taking part in the tournament""" |
32 | best_index = None |
33 | for i in range(tournament_size): |
34 | rnd_index = random_index_sequence.getNext() |
35 | if best_index is None or individuals[rnd_index].fitness > best_index.fitness: |
36 | best_index = individuals[rnd_index] |
37 | return best_index |
38 | |
39 | def addGenotypeIfValid(self, ind_list, genotype): |
40 | new_individual = Individual() |
41 | new_individual.set_and_evaluate(genotype, self.evaluate) |
42 | if new_individual.fitness is not BAD_FITNESS: |
43 | ind_list.append(new_individual) |
44 | |
45 | def make_new_population(self, individuals, prob_mut, prob_xov, tournament_size): |
46 | """'individuals' is the input population (a list of individuals). |
47 | Assumptions: all genotypes in 'individuals' are valid and evaluated (have fitness set). |
48 | Returns: a new population of size 'self.popsize' with prob_mut mutants, prob_xov offspring, and the remainder of clones.""" |
49 | |
50 | newpop = [] |
51 | expected_mut = int(self.popsize * prob_mut) |
52 | expected_xov = int(self.popsize * prob_xov) |
53 | assert expected_mut + expected_xov <= self.popsize, "If probabilities of mutation (%g) and crossover (%g) added together exceed 1.0, then the population would grow every generation..." % (prob_mut, prob_xov) |
54 | ris = RandomIndexSequence(len(individuals)) |
55 | |
56 | # adding valid mutants of selected individuals... |
57 | while len(newpop) < expected_mut: |
58 | ind = self.select(individuals, tournament_size=tournament_size, random_index_sequence=ris) |
59 | self.addGenotypeIfValid(newpop, self.mutate(ind.genotype)) |
60 | |
61 | # adding valid crossovers of selected individuals... |
62 | while len(newpop) < expected_mut + expected_xov: |
63 | ind1 = self.select(individuals, tournament_size=tournament_size, random_index_sequence=ris) |
64 | ind2 = self.select(individuals, tournament_size=tournament_size, random_index_sequence=ris) |
65 | self.addGenotypeIfValid(newpop, self.cross_over(ind1.genotype, ind2.genotype)) |
66 | |
67 | # select clones to fill up the new population until we reach the same size as the input population |
68 | while len(newpop) < self.popsize: |
69 | ind = self.select(individuals, tournament_size=tournament_size, random_index_sequence=ris) |
70 | newpop.append(Individual().copyFrom(ind)) |
71 | |
72 | return newpop |
73 | |
74 | def save_state(self, state_filename): |
75 | state = self.get_state() |
76 | if state_filename is None: |
77 | return |
78 | state_filename_tmp = state_filename + ".tmp" |
79 | try: |
80 | with open(state_filename_tmp, "wb") as f: |
81 | pickle.dump(state, f) |
82 | # ensures the new file was first saved OK (e.g. enough free space on device), then replace |
83 | os.replace(state_filename_tmp, state_filename) |
84 | except Exception as ex: |
85 | raise RuntimeError("Failed to save evolution state '%s' (because: %s). This does not prevent the experiment from continuing, but let's stop here to fix the problem with saving state files." % ( |
86 | state_filename_tmp, ex)) |
87 | |
88 | def load_state(self, state_filename): |
89 | if state_filename is None: |
90 | # print("Loading evolution state: file name not provided") |
91 | return None |
92 | try: |
93 | with open(state_filename, 'rb') as f: |
94 | state = pickle.load(f) |
95 | self.set_state(state) |
96 | except FileNotFoundError: |
97 | return None |
98 | print("...Loaded evolution state from '%s'" % state_filename) |
99 | return True |
100 | |
101 | def get_state_filename(self, save_file_name): |
102 | return None if save_file_name is None else save_file_name + '_state.pkl' |
103 | |
104 | def get_state(self): |
105 | return [self.time_elapsed, self.current_generation, self.population_structures, self.hof, self.stats] |
106 | |
107 | def set_state(self, state): |
108 | self.time_elapsed, self.current_generation, self.population_structures, hof_, self.stats = state |
109 | # sorting: ensure that we add from worst to best so all individuals are added to HOF |
110 | for h in sorted(hof_, key=lambda x: x.rawfitness): |
111 | self.hof.add(h) |
112 | |
113 | def update_stats(self, generation, all_individuals): |
114 | worst = min(all_individuals, key=lambda item: item.rawfitness) |
115 | best = max(all_individuals, key=lambda item: item.rawfitness) |
116 | # instead of single best, could add all individuals in population here, but then the outcome would depend on the order of adding |
117 | self.hof.add(best) |
118 | self.stats.append( |
119 | best.rawfitness if self.save_only_best else best) |
120 | print("%d\t%d\t%g\t%g" % (generation, len( |
121 | all_individuals), worst.rawfitness, best.rawfitness)) |
122 | |
123 | def initialize_evolution(self, initialgenotype): |
124 | self.current_generation = 0 |
125 | self.time_elapsed = 0 |
126 | self.stats = [] # stores the best individuals, one from each generation |
127 | initial_individual = Individual() |
128 | initial_individual.set_and_evaluate(initialgenotype, self.evaluate) |
129 | self.hof.add(initial_individual) |
130 | self.stats.append( |
131 | initial_individual.rawfitness if self.save_only_best else initial_individual) |
132 | self.population_structures = PopulationStructures( |
133 | initial_individual=initial_individual, archive_size=0, popsize=self.popsize) |
134 | |
135 | def evolve(self, hof_savefile, generations, initialgenotype, pmut, pxov, tournament_size): |
136 | file_name = self.get_state_filename(hof_savefile) |
137 | state = self.load_state(file_name) |
138 | if state is not None: # loaded state from file |
139 | # saved generation has been completed, start with the next one |
140 | self.current_generation += 1 |
141 | print("...Resuming from saved state: population size = %d, hof size = %d, stats size = %d, archive size = %d, generation = %d/%d" % (len(self.population_structures.population), len(self.hof), |
142 | len(self.stats), (len(self.population_structures.archive)), self.current_generation, generations)) # self.current_generation (and g) are 0-based, parsed_args.generations is 1-based |
143 | |
144 | else: |
145 | self.initialize_evolution(initialgenotype) |
146 | time0 = time.process_time() |
147 | for g in range(self.current_generation, generations): |
148 | self.population_structures.population = self.make_new_population( |
149 | self.population_structures.population, pmut, pxov, tournament_size) |
150 | self.update_stats(g, self.population_structures.population) |
151 | if hof_savefile is not None: |
152 | self.current_generation = g |
153 | self.time_elapsed += time.process_time() - time0 |
154 | self.save_state(file_name) |
155 | if hof_savefile is not None: |
156 | self.save_genotypes(hof_savefile) |
157 | return self.population_structures.population, self.stats |
158 | |
159 | @abstractmethod |
160 | def mutate(self, gen1): |
161 | pass |
162 | |
163 | @abstractmethod |
164 | def cross_over(self, gen1, gen2): |
165 | pass |
166 | |
167 | @abstractmethod |
168 | def evaluate(self, genotype): |
169 | pass |
170 | |
171 | def save_genotypes(self, filename): |
172 | """Implement if you want to save finall genotypes,in default implementation this function is run once at the end of evolution""" |
173 | state_to_save = { |
174 | "number_of_generations": self.current_generation, |
175 | "hof": [{"genotype": individual.genotype, |
176 | "fitness": individual.rawfitness} for individual in self.hof.hof]} |
177 | with open(f"{filename}.json", 'w') as f: |
178 | json.dump(state_to_save, f, cls=Encoder) |
179 | |
180 | |
181 | @staticmethod |
182 | def get_args_for_parser(): |
183 | parser = argparse.ArgumentParser() |
184 | parser.add_argument('-popsize',type= int, default= 50, |
185 | help="Population size, default: 50.") |
186 | parser.add_argument('-generations',type= int, default= 5, |
187 | help="Number of generations, default: 5.") |
188 | parser.add_argument('-tournament',type= int, default= 5, |
189 | help="Tournament size, default: 5.") |
190 | parser.add_argument('-pmut',type= float, default= 0.7, |
191 | help="Probability of mutation, default: 0.7") |
192 | parser.add_argument('-pxov',type= float, default= 0.2, |
193 | help="Probability of crossover, default: 0.2") |
194 | parser.add_argument('-hof_size',type= int, default= 10, |
195 | help="Number of genotypes in Hall of Fame. Default: 10.") |
196 | parser.add_argument('-hof_savefile',type= str, required= False, |
197 | help= 'If set, Hall of Fame will be saved in Framsticks file format (recommended extension *.gen. This also activates saving state (checpoint} file and auto-resuming from the saved state, if this file exists.') |
198 | parser.add_argument('-save_only_best',type= bool, default= True, required= False, |
199 | help="") |
200 | |
201 | return parser |
