source: framspy/evolalg/base/experiment_niching_abc.py @ 1306

Last change on this file since 1306 was 1306, checked in by Maciej Komosinski, 13 months ago

Introduced symbolic names for dissimilarity estimation methods

File size: 10.9 KB
Line 
1import time
2from abc import ABC, abstractmethod
3
4import numpy as np
5from deap import base, tools
6from deap.tools.emo import assignCrowdingDist
7
8from ..constants import BAD_FITNESS
9from ..structures.individual import Individual
10from .experiment_abc import ExperimentABC
11from .remove_diagonal import remove_diagonal
12from FramsticksLib import DissimMethod  # since the descendant ExperimentFramsNiching class does not introduce any Framsticks-specific dissimilarity methods, all of them must be known here (in ExperimentNiching)
13
14
15class DeapFitness(base.Fitness):
16    weights = (1, 1)
17
18    def __init__(self, *args, **kwargs):
19        super(DeapFitness, self).__init__(*args, **kwargs)
20
21
22class ExperimentNiching(ExperimentABC, ABC):
23    fit: str = "niching"
24    normalize: str = "None"
25    archive_size: int = None
26
27    def __init__(self, fit, normalize, popsize, hof_size, save_only_best, knn_niching, knn_nslc, archive_size) -> None:
28        ExperimentABC.__init__(self,popsize=popsize, hof_size=hof_size, save_only_best=save_only_best)
29        self.fit = fit
30        self.normalize = normalize
31        self.knn_niching = knn_niching # this parameter is used for local novelty and local niching
32        self.knn_nslc = knn_nslc
33        self.archive_size=archive_size
34
35        # np.argpartition requires these parameters to be at most popsize-2; popsize is decreased by 1 because we remove_diagonal()
36        if self.knn_niching > popsize - 2:
37            raise ValueError("knn_niching (%d) should be at most popsize-2 (%d)" % (self.knn_niching, popsize-2))
38        if self.knn_nslc > popsize - 2:
39            raise ValueError("knn_nslc (%d) should be at most popsize-2 (%d)" % (self.knn_nslc, popsize-2))
40
41
42    def transform_indexes(self, i, index_array):
43        return [x+1 if x >= i else x for x in index_array]
44
45    def normalize_dissim(self, dissim_matrix):
46        dissim_matrix = remove_diagonal(np.array(dissim_matrix)) # on the diagonal we usually have zeros (an individual is identical with itself, so the dissimilarity is 0). In some techniques we need to find "k" most similar individuals, so we remove the diagonal so that self-similarity of individuals does not interfere with finding "k" other most similar individuals. The matrix from square n*n turns into n*(n-1).
47        if self.normalize == "none":
48            return dissim_matrix
49        elif self.normalize == "max":
50            divide_by = np.max(dissim_matrix)
51        elif self.normalize == "sum":
52            divide_by = np.sum(dissim_matrix)
53        else:
54            raise ValueError("Wrong normalization method: '%s'" % self.normalize)
55        if divide_by != 0:
56            return dissim_matrix/divide_by
57        else:
58            return dissim_matrix
59
60    def do_niching(self, population_structures):
61        population_archive = population_structures.population + population_structures.archive
62        dissim_matrix = self.dissimilarity(population_archive)
63        if "knn" not in self.fit:
64            dissim_list = np.mean(self.normalize_dissim(dissim_matrix), axis=1)
65        else:
66            dissim_list = np.mean(np.partition(
67                self.normalize_dissim(dissim_matrix), self.knn_niching)[:, :self.knn_niching], axis=1)
68
69        if Individual.fitness_set_negative_to_zero is False and ("niching" in self.fit or "novelty" in self.fit):
70                raise ValueError("Negative fitness values not tested in combination with niching or novelty. When using these techniques, verify formulas or consider using the flag -fitness_set_negative_to_zero") # once the formulas are verified/improved, the command-line flag and this conditional check can be removed.
71
72        if "niching" in self.fit:
73            for i, d in zip(population_archive, dissim_list):
74                i.fitness = i.rawfitness * d
75        elif "novelty" in self.fit:
76            for i, d in zip(population_archive, dissim_list):
77                i.fitness = d
78        else:
79            raise ValueError("Unsupported fit type: '%s'. Use the correct type or implement a new behavior." % self.fit)
80        population_structures.update_archive(dissim_matrix, population_archive)
81
82    def do_nsga2_dissim(self, population):
83        dissim_matrix = self.dissimilarity(population)
84        dissim_list = np.mean(self.normalize_dissim(dissim_matrix), axis=1)
85        for i, d in zip(population, dissim_list):
86            i.fitness = DeapFitness(tuple((d, i.rawfitness)))
87
88    def do_nslc_dissim(self, population_structures, pop_offspring):
89        population_archive = population_structures.archive + pop_offspring
90        dissim_matrix = self.dissimilarity(population_archive)
91        normalized_matrix = self.normalize_dissim(dissim_matrix)
92        for i in range(len(normalized_matrix)):
93            temp_dissim = normalized_matrix[i]
94            index_array = np.argpartition(temp_dissim, kth=self.knn_nslc, axis=-1)[:self.knn_nslc]
95            dissim_value = np.mean(np.take_along_axis(
96                temp_dissim, index_array, axis=-1))
97            temp_fitness = population_archive[i].rawfitness
98            population_of_most_similar = list(
99                map(population_archive.__getitem__, self.transform_indexes(i, index_array)))
100            temp_ind_fit = sum(
101                [1 for ind in population_of_most_similar if ind.rawfitness < temp_fitness])
102            population_archive[i].fitness = DeapFitness(
103                tuple((dissim_value, temp_ind_fit)))
104        population_structures.update_archive(dissim_matrix, population_archive)
105
106    def make_new_population_nsga2(self, population_structures, prob_mut, prob_xov):
107        expected_mut = int(self.popsize * prob_mut)
108        expected_xov = int(self.popsize * prob_xov)
109        assert expected_mut + expected_xov <= self.popsize, "If probabilities of mutation (%g) and crossover (%g) added together exceed 1.0, then the population would grow every generation..." % (prob_mut, prob_xov)
110        assignCrowdingDist(population_structures.population)
111        offspring = tools.selTournamentDCD(population_structures.population, self.popsize)
112
113        def addGenotypeIfValid(ind_list, genotype):
114            new_individual = Individual()
115            new_individual.set_and_evaluate(genotype, self.evaluate)
116            if new_individual.fitness is not BAD_FITNESS:
117                ind_list.append(new_individual)
118
119        counter = 0
120
121        def get_individual(pop, c):
122            if c < len(pop):
123                ind = pop[c]
124                c += 1
125                return ind, c
126            else:
127                c = 0
128                ind = pop[c]
129                c += 1
130                return ind, c
131
132        newpop = []
133        while len(newpop) < expected_mut:
134            ind, counter = get_individual(offspring, counter)
135            addGenotypeIfValid(newpop, self.mutate(ind.genotype))
136
137        # adding valid crossovers of selected individuals...
138        while len(newpop) < expected_mut + expected_xov:
139            ind1, counter = get_individual(offspring, counter)
140            ind2, counter = get_individual(offspring, counter)
141            addGenotypeIfValid(newpop, self.cross_over(ind1.genotype, ind2.genotype))
142
143        # select clones to fill up the new population until we reach the same size as the input population
144        while len(newpop) < len(population_structures.population):
145            ind, counter = get_individual(offspring, counter)
146            newpop.append(Individual().copyFrom(ind))
147
148        pop_offspring = population_structures.population + newpop # used both for nsga2 and nslc
149        # print(len(pop_offspring)) # for debugging
150        if self.fit == "nslc":
151            self.do_nslc_dissim(population_structures, pop_offspring)
152        elif self.fit == "nsga2":
153            self.do_nsga2_dissim(pop_offspring)
154        out_pop = tools.selNSGA2(pop_offspring, len(population_structures.population))
155        return out_pop
156
157    def evolve(self, hof_savefile, generations, initialgenotype, pmut, pxov, tournament_size):
158        file_name = self.get_state_filename(hof_savefile)
159        state = self.load_state(file_name)
160        if state is not None:  # loaded state from file
161            # saved generation has been completed, start with the next one
162            self.current_generation += 1
163            print("...Resuming from saved state: population size = %d, hof size = %d, stats size = %d, archive size = %d, generation = %d/%d" % (len(self.population_structures.population), len(self.hof), len(self.stats), (len(self.population_structures.archive)), self.current_generation, generations))  # self.current_generation (and g) are 0-based, parsed_args.generations is 1-based
164        else:
165            self.initialize_evolution(self.genformat, initialgenotype)
166
167        time0 = time.process_time()
168        for g in range(self.current_generation, generations):
169            if self.fit != "raw" and self.fit != "nsga2" and self.fit != "nslc":
170                self.do_niching(self.population_structures)
171
172            if type(self.population_structures.population[0].fitness) == DeapFitness:
173                self.population_structures.population = self.make_new_population_nsga2(  # used both for nsga2 and nslc
174                    self.population_structures, pmut, pxov)
175            else:
176                self.population_structures.population = self.make_new_population(
177                    self.population_structures.population, pmut, pxov, tournament_size)
178
179            self.update_stats(g, self.population_structures.population)
180
181            if hof_savefile is not None:
182                self.current_generation = g
183                self.time_elapsed += time.process_time() - time0
184                self.save_state(file_name)
185        if hof_savefile is not None:
186            self.save_genotypes(hof_savefile)
187        return self.population_structures.population, self.stats
188
189    @staticmethod
190    def get_args_for_parser():
191        parser = ExperimentABC.get_args_for_parser()
192        parser.add_argument("-dissim", type = lambda arg: DissimMethod[arg], choices = DissimMethod,
193                   default=DissimMethod.PHENE_STRUCT_OPTIM,
194                   help="Dissimilarity measure type. Available: " + str(DissimMethod._member_names_))
195        parser.add_argument("-fit",type= str, default="raw",
196                        help="Fitness type, availible types: niching, novelty, knn_niching (local), knn_novelty (local), nsga2, nslc and raw (default)")
197        parser.add_argument("-archive",type= int, default=50, help="Maximum archive size")
198        parser.add_argument("-normalize",type= str, default= "max",
199                            help="What normalization to use for the dissimilarity matrix: max (default}, sum, or none")
200        parser.add_argument("-knn_niching",type= int, default=5,
201                        help="The number of nearest neighbors for local novelty/niching. If knn==0, global is performed. Default: 5")
202        parser.add_argument("-knn_nslc",type= int, default=5,
203                        help="The number of nearest neighbors for NSLC. If knn==0, global is performed. Default: 5")
204        return parser
205       
206    @abstractmethod
207    def dissimilarity(self, population: list):
208        pass
Note: See TracBrowser for help on using the repository browser.