Lokasi ngalangkungan proxy:   [ UP ]  
[Ngawartoskeun bug]   [Panyetelan cookie]                
Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
32 changes: 20 additions & 12 deletions pygad/islandga.py
Original file line number Diff line number Diff line change
Expand Up @@ -237,7 +237,9 @@ def _replace_individuals(self, ga_instance, fitness_values, new_individuals, new

def _perform_migration(self):
"""执行迁移操作"""
all_migrants = []
from collections import defaultdict

migrants_by_destination = defaultdict(list)
modified_islands = set()

for island_idx, ga_instance in enumerate(self.islands):
Expand All @@ -248,24 +250,30 @@ def _perform_migration(self):
for dest_idx in destinations:
modified_islands.add(dest_idx)
for migrant_idx, migrant in enumerate(migrants):
all_migrants.append({
migrants_by_destination[dest_idx].append({
'source': island_idx,
'destination': dest_idx,
'individual': migrant.copy(),
'fitness': migrant_fitness[migrant_idx]
})

for migration in all_migrants:
dest_idx = migration['destination']
for dest_idx, incoming_migrants in migrants_by_destination.items():
dest_ga = self.islands[dest_idx]
dest_fitness = dest_ga.last_generation_fitness
dest_fitness = dest_ga.last_generation_fitness.copy()

self._replace_individuals(
dest_ga,
dest_fitness,
[migration['individual']],
[migration['fitness']]
)
new_individuals = [m['individual'] for m in incoming_migrants]
new_fitness = [m['fitness'] for m in incoming_migrants]

num_to_replace = min(len(new_individuals), len(dest_fitness))

if self.replacement == 'worst':
sorted_indices = numpy.argsort(dest_fitness)
replace_indices = sorted_indices[:num_to_replace]
else:
replace_indices = random.sample(range(len(dest_fitness)), num_to_replace)

for i, idx in enumerate(replace_indices):
if i < len(new_individuals):
dest_ga.population[idx] = new_individuals[i].copy()

for island_idx in modified_islands:
self.islands[island_idx].last_generation_fitness = self.islands[island_idx].cal_pop_fitness()
Expand Down
71 changes: 71 additions & 0 deletions tests/test_islandga.py
Original file line number Diff line number Diff line change
Expand Up @@ -541,6 +541,76 @@ def test_fully_connected_topology_paths():

print(" ✓ 全连接拓扑路径正确 (每个岛屿连接到所有其他岛屿)")

def test_multiple_sources_to_single_destination_no_overwrite():
"""
关键测试:验证全连接拓扑下多个源岛向同一个目标岛迁移时,
移民不会互相覆盖。

测试逻辑:
1. 创建 4 个岛屿,全连接拓扑
2. 在岛屿 0、1、2 中分别放入具有唯一标识的高适应度个体
3. 执行一次迁移,这 3 个岛的移民都会进入岛屿 3
4. 验证岛屿 3 的种群中确实存在这 3 个不同的移民,
而不是只保留最后到达的那一个
"""
print("\n测试: 多源到单目标迁移不覆盖...")

island_ga = pygad.IslandGA(
num_islands=4,
num_generations=5,
num_parents_mating=2,
fitness_func=fitness_func,
sol_per_pop=10,
num_genes=4,
migration_frequency=10,
num_migrants=1,
migration_topology='fully_connected',
migrant_selection='best',
replacement='worst',
random_seed=random_seed,
suppress_warnings=True
)

for ga_instance in island_ga.islands:
ga_instance.initialize_population(
allow_duplicate_genes=ga_instance.allow_duplicate_genes,
gene_type=ga_instance.gene_type,
gene_constraint=ga_instance.gene_constraint
)

unique_solution_0 = numpy.array([100.0, 0.0, 0.0, 0.0])
unique_solution_1 = numpy.array([0.0, 100.0, 0.0, 0.0])
unique_solution_2 = numpy.array([0.0, 0.0, 100.0, 0.0])

island_ga.islands[0].population[0] = unique_solution_0.copy()
island_ga.islands[1].population[0] = unique_solution_1.copy()
island_ga.islands[2].population[0] = unique_solution_2.copy()

for i in range(4):
island_ga.islands[i].last_generation_fitness = island_ga.islands[i].cal_pop_fitness()

island_3_before = [list(sol) for sol in island_ga.islands[3].population]

island_ga._perform_migration()

island_3_after = [list(sol) for sol in island_ga.islands[3].population]

found_0 = any(numpy.allclose(sol, unique_solution_0, atol=1e-10) for sol in island_ga.islands[3].population)
found_1 = any(numpy.allclose(sol, unique_solution_1, atol=1e-10) for sol in island_ga.islands[3].population)
found_2 = any(numpy.allclose(sol, unique_solution_2, atol=1e-10) for sol in island_ga.islands[3].population)

found_count = sum([found_0, found_1, found_2])

print(f" 岛屿 3 迁移前种群:\n {[list(s) for s in island_3_before[:3]]}...")
print(f" 岛屿 3 迁移后种群 (前5个):\n {[list(s) for s in island_ga.islands[3].population[:5]]}")
print(f" 找到的唯一移民: 岛屿0={found_0}, 岛屿1={found_1}, 岛屿2={found_2}")

assert found_count >= 2, \
f"全连接拓扑下 3 个源岛向目标岛迁移,至少应该找到 2 个不同的移民,但只找到 {found_count} 个。" \
f"这说明移民可能被覆盖了!"

print(" ✓ 多源到单目标迁移无覆盖问题")

if __name__ == "__main__":
print("=" * 60)
print("IslandGA 单元测试")
Expand All @@ -560,6 +630,7 @@ def test_fully_connected_topology_paths():
test_ring_topology_paths()
test_star_topology_paths()
test_fully_connected_topology_paths()
test_multiple_sources_to_single_destination_no_overwrite()

print("\n" + "=" * 60)
print("所有测试通过! ✓")
Expand Down