source: framspy/evolalg/base/experiment_niching_abc.py

Last change on this file was 1289, checked in by Maciej Komosinski, 3 months ago

fitness_set_negative_to_zero boolean (a.k.a. "only positive fitness", needed for novelty and niching diversity control) becomes a command-line flag instead of a hardcoded value

File size: 10.4 KB
Line 
1import time
2from abc import ABC, abstractmethod
3
4import numpy as np
5from deap import base, tools
6from deap.tools.emo import assignCrowdingDist
7
8from ..constants import BAD_FITNESS
9from ..structures.individual import Individual
10from .experiment_abc import ExperimentABC
11from .remove_diagonal import remove_diagonal
12
13
14class DeapFitness(base.Fitness):
15    weights = (1, 1)
16
17    def __init__(self, *args, **kwargs):
18        super(DeapFitness, self).__init__(*args, **kwargs)
19
20
21class ExperimentNiching(ExperimentABC, ABC):
22    fit: str = "niching"
23    normalize: str = "None"
24    archive_size: int = None
25
26    def __init__(self, fit, normalize, popsize, hof_size, save_only_best, knn_niching, knn_nslc, archive_size) -> None:
27        ExperimentABC.__init__(self,popsize=popsize, hof_size=hof_size, save_only_best=save_only_best)
28        self.fit = fit
29        self.normalize = normalize
30        self.knn_niching = knn_niching # this parameter is used for local novelty and local niching
31        self.knn_nslc = knn_nslc
32        self.archive_size=archive_size
33
34        # np.argpartition requires these parameters to be at most popsize-2; popsize is decreased by 1 because we remove_diagonal()
35        if self.knn_niching > popsize - 2:
36            raise ValueError("knn_niching (%d) should be at most popsize-2 (%d)" % (self.knn_niching, popsize-2))
37        if self.knn_nslc > popsize - 2:
38            raise ValueError("knn_nslc (%d) should be at most popsize-2 (%d)" % (self.knn_nslc, popsize-2))
39
40
41    def transform_indexes(self, i, index_array):
42        return [x+1 if x >= i else x for x in index_array]
43
44    def normalize_dissim(self, dissim_matrix):
45        dissim_matrix = remove_diagonal(np.array(dissim_matrix)) # on the diagonal we usually have zeros (an individual is identical with itself, so the dissimilarity is 0). In some techniques we need to find "k" most similar individuals, so we remove the diagonal so that self-similarity of individuals does not interfere with finding "k" other most similar individuals. The matrix from square n*n turns into n*(n-1).
46        if self.normalize == "none":
47            return dissim_matrix
48        elif self.normalize == "max":
49            divide_by = np.max(dissim_matrix)
50        elif self.normalize == "sum":
51            divide_by = np.sum(dissim_matrix)
52        else:
53            raise ValueError("Wrong normalization method: '%s'" % self.normalize)
54        if divide_by != 0:
55            return dissim_matrix/divide_by
56        else:
57            return dissim_matrix
58
59    def do_niching(self, population_structures):
60        population_archive = population_structures.population + population_structures.archive
61        dissim_matrix = self.dissimilarity(population_archive)
62        if "knn" not in self.fit:
63            dissim_list = np.mean(self.normalize_dissim(dissim_matrix), axis=1)
64        else:
65            dissim_list = np.mean(np.partition(
66                self.normalize_dissim(dissim_matrix), self.knn_niching)[:, :self.knn_niching], axis=1)
67
68        if Individual.fitness_set_negative_to_zero is False and ("niching" in self.fit or "novelty" in self.fit):
69                raise ValueError("Negative fitness values not tested in combination with niching or novelty. When using these techniques, verify formulas or consider using the flag -fitness_set_negative_to_zero") # once the formulas are verified/improved, the command-line flag and this conditional check can be removed.
70
71        if "niching" in self.fit:
72            for i, d in zip(population_archive, dissim_list):
73                i.fitness = i.rawfitness * d
74        elif "novelty" in self.fit:
75            for i, d in zip(population_archive, dissim_list):
76                i.fitness = d
77        else:
78            raise ValueError("Unsupported fit type: '%s'. Use the correct type or implement a new behavior." % self.fit)
79        population_structures.update_archive(dissim_matrix, population_archive)
80
81    def do_nsga2_dissim(self, population):
82        dissim_matrix = self.dissimilarity(population)
83        dissim_list = np.mean(self.normalize_dissim(dissim_matrix), axis=1)
84        for i, d in zip(population, dissim_list):
85            i.fitness = DeapFitness(tuple((d, i.rawfitness)))
86
87    def do_nslc_dissim(self, population):
88        dissim_matrix = self.dissimilarity(population)
89        normalized_matrix = self.normalize_dissim(dissim_matrix)
90        for i in range(len(normalized_matrix)):
91            temp_dissim = normalized_matrix[i]
92            index_array = np.argpartition(temp_dissim, kth=self.knn_nslc, axis=-1)[:self.knn_nslc]
93            dissim_value = np.mean(np.take_along_axis(
94                temp_dissim, index_array, axis=-1))
95            temp_fitness = population[i].rawfitness
96            population_of_most_similar = list(
97                map(population.__getitem__, self.transform_indexes(i, index_array)))
98            temp_ind_fit = sum(
99                [1 for ind in population_of_most_similar if ind.rawfitness < temp_fitness])
100            population[i].fitness = DeapFitness(
101                tuple((dissim_value, temp_ind_fit)))
102
103    def make_new_population_nsga2(self, population, prob_mut, prob_xov):
104        expected_mut = int(self.popsize * prob_mut)
105        expected_xov = int(self.popsize * prob_xov)
106        assert expected_mut + expected_xov <= self.popsize, "If probabilities of mutation (%g) and crossover (%g) added together exceed 1.0, then the population would grow every generation..." % (prob_mut, prob_xov)
107        assignCrowdingDist(population)
108        offspring = tools.selTournamentDCD(population, self.popsize)
109
110        def addGenotypeIfValid(ind_list, genotype):
111            new_individual = Individual()
112            new_individual.set_and_evaluate(genotype, self.evaluate)
113            if new_individual.fitness is not BAD_FITNESS:
114                ind_list.append(new_individual)
115
116        counter = 0
117
118        def get_individual(pop, c):
119            if c < len(pop):
120                ind = pop[c]
121                c += 1
122                return ind, c
123            else:
124                c = 0
125                ind = pop[c]
126                c += 1
127                return ind, c
128
129        newpop = []
130        while len(newpop) < expected_mut:
131            ind, counter = get_individual(offspring, counter)
132            addGenotypeIfValid(newpop, self.mutate(ind.genotype))
133
134        # adding valid crossovers of selected individuals...
135        while len(newpop) < expected_mut + expected_xov:
136            ind1, counter = get_individual(offspring, counter)
137            ind2, counter = get_individual(offspring, counter)
138            addGenotypeIfValid(newpop, self.cross_over(ind1.genotype, ind2.genotype))
139
140        # select clones to fill up the new population until we reach the same size as the input population
141        while len(newpop) < len(population):
142            ind, counter = get_individual(offspring, counter)
143            newpop.append(Individual().copyFrom(ind))
144
145        pop_offspring = population + newpop # this is OK for NSGA2, but TODO verify if this should also be used for NSLC?
146        print(len(pop_offspring))
147        if self.fit == "nslc": # TODO should NSLC be also equipped with a novelty archive? (with an admittance threshold?)
148            self.do_nslc_dissim(pop_offspring)
149        elif self.fit == "nsga2":
150            self.do_nsga2_dissim(pop_offspring)
151        out_pop = tools.selNSGA2(pop_offspring, len(population))
152        return out_pop
153
154    def evolve(self, hof_savefile, generations, initialgenotype, pmut, pxov, tournament_size):
155        file_name = self.get_state_filename(hof_savefile)
156        state = self.load_state(file_name)
157        if state is not None:  # loaded state from file
158            # saved generation has been completed, start with the next one
159            self.current_generation += 1
160            print("...Resuming from saved state: population size = %d, hof size = %d, stats size = %d, archive size = %d, generation = %d/%d" % (len(self.population_structures.population), len(self.hof), len(self.stats), (len(self.population_structures.archive)), self.current_generation, generations))  # self.current_generation (and g) are 0-based, parsed_args.generations is 1-based
161        else:
162            self.initialize_evolution(self.genformat, initialgenotype)
163
164        time0 = time.process_time()
165        for g in range(self.current_generation, generations):
166            if self.fit != "raw" and self.fit != "nsga2" and self.fit != "nslc":
167                self.do_niching(self.population_structures)
168
169            if type(self.population_structures.population[0].fitness) == DeapFitness:
170                self.population_structures.population = self.make_new_population_nsga2(  # used both for nsga2 and nslc
171                    self.population_structures.population, pmut, pxov)
172            else:
173                self.population_structures.population = self.make_new_population(
174                    self.population_structures.population, pmut, pxov, tournament_size)
175
176            self.update_stats(g, self.population_structures.population)
177
178            if hof_savefile is not None:
179                self.current_generation = g
180                self.time_elapsed += time.process_time() - time0
181                self.save_state(file_name)
182        if hof_savefile is not None:
183            self.save_genotypes(hof_savefile)
184        return self.population_structures.population, self.stats
185
186    @staticmethod
187    def get_args_for_parser():
188        parser = ExperimentABC.get_args_for_parser()
189        parser.add_argument("-dissim",type= int, default=1,
190                   help="Dissimilarity measure type. Available: -3:freq, -2:dens, -1:Leven, 1:frams-struct (default}, 2:frams-descr")
191        parser.add_argument("-fit",type= str, default="raw",
192                        help="Fitness type, availible types: niching, novelty, knn_niching (local), knn_novelty (local), nsga2, nslc and raw (default)")
193        parser.add_argument("-archive",type= int, default=50, help="Maximum archive size")
194        parser.add_argument("-normalize",type= str, default= "max",
195                            help="What normalization to use for the dissimilarity matrix: max (default}, sum, or none")
196        parser.add_argument("-knn_niching",type= int, default=5,
197                        help="The number of nearest neighbors for local novelty/niching. If knn==0, global is performed. Default: 5")
198        parser.add_argument("-knn_nslc",type= int, default=5,
199                        help="The number of nearest neighbors for NSLC. If knn==0, global is performed. Default: 5")
200        return parser
201       
202    @abstractmethod
203    def dissimilarity(self, population: list):
204        pass
Note: See TracBrowser for help on using the repository browser.