source: framspy/evolalg/base/experiment_niching_abc.py @ 1194

Last change on this file since 1194 was 1194, checked in by Maciej Komosinski, 4 months ago

make_new_population() targets the desired popsize instead of preserving the input (current) population size

File size: 9.1 KB
Line 
1import time
2from abc import ABC, abstractmethod
3from tkinter import W
4
5import numpy as np
6from deap import base, tools
7from deap.tools.emo import assignCrowdingDist
8
9from ..constants import BAD_FITNESS
10from ..structures.individual import Individual
11from .experiment_abc import ExperimentABC
12from .remove_diagonal import remove_diagonal
13
14
15class DeapFitness(base.Fitness):
16    weights = (1, 1)
17
18    def __init__(self, *args, **kwargs):
19        super(DeapFitness, self).__init__(*args, **kwargs)
20
21
22class ExperimentNiching(ExperimentABC, ABC):
23    fit: str = "niching"
24    normalize: str = "None"
25    archive_size: int = None
26
27    def __init__(self, fit, normalize, popsize, hof_size, save_only_best=True, knn_niching=5, knn_nslc=10, archive_size=0) -> None:
28        ExperimentABC.__init__(self,popsize=popsize, hof_size=hof_size, save_only_best=save_only_best)
29        self.fit = fit
30        self.normalize = normalize
31        self.knn_niching = knn_niching
32        self.knn_nslc = knn_nslc
33        self.archive_size=archive_size
34        if popsize < self.knn_niching:
35            self.knn_niching = popsize - 2
36        if popsize < self.knn_nslc:
37            self.knn_nslc = popsize - 2
38
39    def transform_indexes(self, i, index_array):
40        return [x+1 if x >= i else x for x in index_array]
41
42    def normalize_dissim(self, dissim_matrix):
43        dissim_matrix = remove_diagonal(np.array(dissim_matrix))
44        if self.normalize == "none":
45            return dissim_matrix
46        elif self.normalize == "max":
47            divide_by = np.max(dissim_matrix)
48        elif self.normalize == "sum":
49            divide_by = np.sum(dissim_matrix)
50        else:
51            raise Exception(f"Wrong normalization method,", self.normalize)
52        if divide_by != 0:
53            return dissim_matrix/divide_by
54        else:
55            return dissim_matrix
56
57    def do_niching(self, population_structures):
58        population_archive = population_structures.population + population_structures.archive
59        dissim_matrix = self.dissimilarity(population_archive)
60        if "knn" not in self.fit:
61            dissim_list = np.mean(self.normalize_dissim(dissim_matrix), axis=1)
62        else:
63            dissim_list = np.mean(np.partition(
64                self.normalize_dissim(dissim_matrix), self.knn_niching)[:, :self.knn_niching], axis=1)
65
66        if "niching" in self.fit:
67            for i, d in zip(population_archive, dissim_list):
68                i.fitness = i.rawfitness * d
69        elif "novelty" in self.fit:
70            for i, d in zip(population_archive, dissim_list):
71                i.fitness = d
72        else:
73            raise Exception("Wrong fit type: ", self.fit,
74                            f" chose correct one or implement new behavior")
75        population_structures.update_archive(dissim_matrix, population_archive)
76
77    def do_nsga2_dissim(self, population):
78        dissim_matrix = self.dissimilarity(population)
79        dissim_list = np.mean(self.normalize_dissim(dissim_matrix), axis=1)
80        for i, d in zip(population, dissim_list):
81            i.fitness = DeapFitness(tuple((d, i.rawfitness)))
82
83    def do_nslc_dissim(self, population):
84        dissim_matrix = self.dissimilarity(population)
85        normalized_matrix = self.normalize_dissim(dissim_matrix)
86        for i in range(len(normalized_matrix)):
87            temp_dissim = normalized_matrix[i]
88            index_array = np.argpartition(temp_dissim, kth=self.knn_nslc, axis=-1)[:self.knn_nslc]
89            dissim_value = np.mean(np.take_along_axis(
90                temp_dissim, index_array, axis=-1))
91            temp_fitness = population[i].rawfitness
92            population_of_most_different = list(
93                map(population.__getitem__, self.transform_indexes(i, index_array)))
94            temp_ind_fit = sum(
95                [1 for ind in population_of_most_different if ind.rawfitness < temp_fitness])
96            population[i].fitness = DeapFitness(
97                tuple((dissim_value, temp_ind_fit)))
98
99    def make_new_population_nsga2(self, population, prob_mut, prob_xov):
100        expected_mut = int(self.popsize * prob_mut)
101        expected_xov = int(self.popsize * prob_xov)
102        assert expected_mut + expected_xov <= self.popsize, "If probabilities of mutation (%g) and crossover (%g) added together exceed 1.0, then the population would grow every generation..." % (prob_mut, prob_xov)
103        assignCrowdingDist(population)
104        offspring = tools.selTournamentDCD(population, self.popsize)
105
106        def addGenotypeIfValid(ind_list, genotype):
107            new_individual = Individual()
108            new_individual.set_and_evaluate(genotype, self.evaluate)
109            if new_individual.fitness is not BAD_FITNESS:
110                ind_list.append(new_individual)
111
112        counter = 0
113
114        def get_individual(pop, c):
115            if c < len(pop):
116                ind = pop[c]
117                c += 1
118                return ind, c
119            else:
120                c = 0
121                ind = pop[c]
122                c += 1
123                return ind, c
124
125        newpop = []
126        while len(newpop) < expected_mut:
127            ind, counter = get_individual(offspring, counter)
128            addGenotypeIfValid(newpop, self.mutate(ind.genotype))
129
130        # adding valid crossovers of selected individuals...
131        while len(newpop) < expected_mut + expected_xov:
132            ind1, counter = get_individual(offspring, counter)
133            ind2, counter = get_individual(offspring, counter)
134            addGenotypeIfValid(newpop, self.cross_over(ind1.genotype, ind2.genotype))
135
136        # select clones to fill up the new population until we reach the same size as the input population
137        while len(newpop) < len(population):
138            ind, counter = get_individual(offspring, counter)
139            newpop.append(Individual().copyFrom(ind))
140
141        pop_offspring = population+newpop
142        print(len(pop_offspring))
143        if self.fit == "nslc":
144            self.do_nslc_dissim(pop_offspring)
145        elif self.fit == "nsga2":
146            self.do_nsga2_dissim(pop_offspring)
147        out_pop = tools.selNSGA2(pop_offspring, len(population))
148        return out_pop
149
150    def evolve(self, hof_savefile, generations, initialgenotype, pmut, pxov, tournament_size):
151        file_name = self.get_state_filename(hof_savefile)
152        state = self.load_state(file_name)
153        if state is not None:  # loaded state from file
154            # saved generation has been completed, start with the next one
155            self.current_generation += 1
156            print("...Resuming from saved state: population size = %d, hof size = %d, stats size = %d, archive size = %d, generation = %d/%d" % (len(self.population_structures.population), len(self.hof),
157                                                                                                                                                 len(self.stats),  (len(self.population_structures.archive)), self.current_generation, generations))  # self.current_generation (and g) are 0-based, parsed_args.generations is 1-based
158        else:
159            self.initialize_evolution(self.genformat, initialgenotype)
160
161        time0 = time.process_time()
162        for g in range(self.current_generation, generations):
163            if self.fit != "raw" and self.fit != "nsga2" and self.fit != "nslc":
164                self.do_niching(self.population_structures)
165
166            if type(self.population_structures.population[0].fitness) == DeapFitness:
167                self.population_structures.population = self.make_new_population_nsga2(
168                    self.population_structures.population, pmut, pxov)
169            else:
170                self.population_structures.population = self.make_new_population(
171                    self.population_structures.population, pmut, pxov, tournament_size)
172
173            self.update_stats(g, self.population_structures.population)
174
175            if hof_savefile is not None:
176                self.current_generation = g
177                self.time_elapsed += time.process_time() - time0
178                self.save_state(file_name)
179        if hof_savefile is not None:
180            self.save_genotypes(hof_savefile)
181        return self.population_structures.population, self.stats
182
183    @staticmethod
184    def get_args_for_parser():
185        parser = ExperimentABC.get_args_for_parser()
186        parser.add_argument("-dissim",type= int, default= "frams",
187                   help="Dissimilarity measure type. Availible -2:emd, -1:lev, 1:frams1 (default}, 2:frams2")
188        parser.add_argument("-fit",type= str, default= "raw",
189                        help="Fitness type, availible  types: niching, novelty, nsga2, nslc and raw (default}")
190        parser.add_argument("-archive",type= int, default= 50,
191                            help="Maximum archive size")
192        parser.add_argument("-normalize",type= str, default= "max",
193                            help="What normalization use for dissimilarity matrix, max (default}, sum and none")
194        parser.add_argument("-knn",type= int, default= 0,
195                        help="Nearest neighbors parameter for local novelty/niching, if knn==0 global is performed.Default:0")
196        return parser
197       
198    @abstractmethod
199    def dissimilarity(self, population: list):
200        pass
Note: See TracBrowser for help on using the repository browser.