Module ortools_utils.explainability.negative_explanations

Expand source code
from multiprocessing import Process, Queue
from typing import *
import logging

from ortools.sat.python import cp_model

from ortools_utils.model_indexation.constraints import NamedBlock, UsualConstraint
from .constraints_relaxation import ConstraintsRelaxation
from .consistency import is_consistent

logger = logging.getLogger(__name__)
logging.basicConfig(level=logging.WARNING)


class NegativeExplanation:
    """
    Class for generating the explanation of an unsolvable model.
    """

    def __init__(
            self,
            model,
            split_choice=3,  # Argument explaining how we split the set of constraints in two
            nb_iis_granularite=1
    ):
        self.model = model
        self.dict_indexing = model.dict_indexing()
        self.nb_iis_granularite = nb_iis_granularite

        self.constraints_relaxation = ConstraintsRelaxation(model)
        self.solver = cp_model.CpSolver()

        self.index_var_assumption_to_block: Dict[int, NamedBlock] = dict()
        self.block_to_index_var_assumption: Dict[NamedBlock, int] = dict()
        self.list_idx_usual_constraints_with_assumptions = []
        self.list_idx_constant_constraints_with_assumptions = []
        self.variables_ajoutees = []

        self.list_already_found_iis = []

        self.split_choice = split_choice

    def is_infeasible(self) -> bool:
        """Before searching for IIS, checks once and for all that the model is not feasible"""
        self.model.Proto().ClearField("objective")
        return not is_consistent(self.model, self.solver)

    def find_one_iis_combined(self) -> [NamedBlock]:
        """Launches QuickXplain and Sufficient Assumption on two separate threads
        Returns the first result"""

        def thread_qx():
            logger.debug('Launching QuickXplain')
            res = self.find_one_iis_auto_zoom_quick_xplain()
            logger.debug('QuickXplain is done')
            return res

        def thread_sa():
            logger.debug('Launching Sufficient Assumption')
            res = self.find_one_iis_sufficient_assumption()
            logger.debug('Sufficient Assumption is done')
            return res

        class CustomProcess(Process):
            def __init__(self, tache, queue_de_retour):
                Process.__init__(self)
                self.tache = tache
                self.queue_de_retour = queue_de_retour

            def run(self):
                resultat = self.tache()
                self.queue_de_retour.put(resultat)

        queue_pour_le_resultat = Queue()
        liste_de_process = [CustomProcess(tache, queue_pour_le_resultat) for tache in [thread_qx, thread_sa]]

        for idx, process in enumerate(liste_de_process):
            process.start()

        resultat = queue_pour_le_resultat.get()

        for process in liste_de_process:
            process.terminate()

        return resultat

    def find_one_iis_sufficient_assumption(self) -> [NamedBlock]:
        """
        Finds one IIS with the Sufficient Assumption way
        Also uses a little Quick Xplain because SA does not handle constant constraints
        """
        if not self.is_infeasible():
            logger.info('Problem is solvable')
            return None

        mon_iis_uc = self._find_one_iis_and_zoom('sufficient_assumption')
        if not mon_iis_uc:
            mon_iis_uc = []

        logger.info('IIS found with sufficient assumption :')
        logger.info(mon_iis_uc)
        logger.info('Reducing the IIS with quick xplain... ')

        self.constraints_relaxation.relax_all_real_constraints_but(mon_iis_uc)

        # Finds one IIS with sufficient assumption
        mes_blocks = mon_iis_uc + [b for b in self.dict_indexing.return_all_blocks_as_named() if not b.is_true_constraint()]
        mon_iis_complet = self._find_one_iis_and_zoom('quick_xplain', mes_blocks)

        logger.info('Final IIS:')
        logger.info(mon_iis_complet)

        return mon_iis_complet

    def _find_one_iis_and_zoom(self, method: str, all_blocks=None) -> [NamedBlock]:
        """
        Finds one IIS on self.model (some constraints may be released)
        Zooms automatically to return an IIS with the most appropriate granularity
        At the end of this function, relaxed blocks are exactly the same as in the beginning
        """
        if not all_blocks:
            all_blocks = self.dict_indexing.return_all_blocks_as_named()

        # Granularité initiale
        current_iis = self.find_iis_single_call(method, all_blocks)  # Liste de NamedBlock
        if len(current_iis) == 0:
            logger.info('There is a problem within the background blocks')
            return []

        # On relache une fois pour toutes ce qui n'est pas dans l'IIS
        self.constraints_relaxation.relax_all_but(current_iis, method)

        logger.info('Large IIS: ')
        logger.info(current_iis)

        while True:
            longueur_iis = len(current_iis)
            nouv_iis = None
            new_blocks = None
            for nb in current_iis:
                while nb.can_be_split() and not new_blocks:
                    list_nb = nb.split_on_next_dimension()
                    new_blocks = list_nb + [b for b in current_iis if b != nb]
                    logger.debug('New blocks : {}'.format(new_blocks))
                    nouv_iis = self.find_iis_single_call(method, new_blocks)
                    logger.debug('New-scale IIS : {}'.format(nouv_iis))
                    if len(nouv_iis) > longueur_iis + self.nb_iis_granularite - 1:
                        logger.debug('This dimension is not relevant')
                        nb.remove_dim_to_split()
                        nouv_iis = None
                        new_blocks = None
            if not nouv_iis:
                break
            else:
                current_iis = nouv_iis
                self.constraints_relaxation.relax_all_but(current_iis, method)
                logger.info('Current IIS: ')
                logger.info(current_iis)

        logger.info('Final IIS:')
        logger.info(current_iis)
        self.constraints_relaxation.inv_relax_all()
        return current_iis

    def find_one_iis_auto_zoom_quick_xplain(self) -> [NamedBlock]:
        """
        Finds one IIS by zooming automatically with QX method
        Starts by finding MacroBlocks that are in conflict and then zooms in progressively
        """
        if not self.is_infeasible():
            logger.info('Problem is feasible')
            return None

        return self._find_one_iis_and_zoom('quick_xplain')

    def find_set_of_distinct_iis_combined(self) -> [[NamedBlock]]:
        """Launches QuickXplain and Sufficient Assumption on two separate threads
        Returns the first result"""

        def thread_qx():
            logger.debug('Lauching QuickXplain')
            res = self.find_set_of_distinct_iis_qx()
            logger.debug('QuickXplain is done')
            return res

        def thread_sa():
            logger.debug('Launching Sufficient Assumption')
            res = self.find_set_of_distinct_iis_sa()
            logger.debug('Sufficient Assumption is done')
            return res

        class CustomProcess(Process):
            def __init__(self, tache, queue_de_retour):
                Process.__init__(self)
                self.tache = tache
                self.queue_de_retour = queue_de_retour

            def run(self):
                resultat = self.tache()
                self.queue_de_retour.put(resultat)

        queue_pour_le_resultat = Queue()
        liste_de_process = [CustomProcess(tache, queue_pour_le_resultat) for tache in [thread_qx, thread_sa]]

        for idx, process in enumerate(liste_de_process):
            process.start()

        resultat = queue_pour_le_resultat.get()

        for process in liste_de_process:
            process.terminate()

        return resultat

    def find_set_of_distinct_iis_qx(self) -> [[NamedBlock]]:
        """
        Returns a set of IIS that do not intersect each other, with QX method
        """
        if not self.is_infeasible():
            return None

        mes_iis = []
        self.list_already_found_iis = []
        on_continue = True

        while on_continue:
            logger.info('Searching for a new IIS... ')

            # On relache les contraintes des IIS déjà trouvés
            for iis in mes_iis:
                self.constraints_relaxation.relax_list_blocks(iis)

            # On vérifie qu'on a encore au moins un conflit
            status = is_consistent(self.model, self.solver)

            if status:
                logger.info('No more IIS')
                on_continue = False
            else:
                nouvel_iis = self._find_one_iis_and_zoom('quick_xplain')
                if len(nouvel_iis) == 0:
                    on_continue = False
                else:
                    mes_iis.append(nouvel_iis)
                    self.list_already_found_iis.append(nouvel_iis)

        self.constraints_relaxation.inv_relax_all()
        return mes_iis

    def find_set_of_distinct_iis_sa(self) -> [[NamedBlock]]:
        """
        Returns a set of IIS that do not intersect each other, with SA method
        """
        if not self.is_infeasible():
            return None

        mes_iis = []
        self.list_already_found_iis = []
        on_continue = True

        while on_continue:
            logger.info('Searching for a new IIS... ')

            # On relache les contraintes des IIS déjà trouvés
            for iis in mes_iis:
                self.constraints_relaxation.relax_list_blocks(iis)

            # On vérifie qu'on a encore au moins un conflit
            status = is_consistent(self.model, self.solver)

            if status:
                logger.info('No more IIS')
                on_continue = False
            else:
                mon_iis_uc = self._find_one_iis_and_zoom('sufficient_assumption')

                # On relache les contraintes des IIS déjà trouvés
                for iis in mes_iis:
                    self.constraints_relaxation.relax_list_blocks(iis)

                logger.debug('IIS found with sufficient assumption:')
                logger.debug(mon_iis_uc)

                if not mon_iis_uc:
                    mon_iis_uc = []

                self.constraints_relaxation.relax_all_real_constraints_but(mon_iis_uc)

                # Finds one IIS with sufficient assumption
                mes_blocks = mon_iis_uc + [b for b in self.dict_indexing.return_all_blocks_as_named() if not b.is_true_constraint()]
                mon_iis_complet = self._find_one_iis_and_zoom('quick_xplain', mes_blocks)

                logger.debug('Final IIS:')
                logger.debug(mon_iis_complet)

                if len(mon_iis_complet) == 0:
                    on_continue = False
                else:
                    mes_iis.append(mon_iis_complet)
                    self.list_already_found_iis.append(mon_iis_complet)

        self.constraints_relaxation.inv_relax_all()
        return mes_iis

    def split(self, C: List[NamedBlock], choice: int):
        # Attention à la méthode de découpage
        # S'il n'y a pas au moins un block dans chaque partie on va boucler à l'infini
        """
        Function that split the list C of constraint's blocks into two list of constraint's blocks
        Beware if implementing new splitting methods. If the method occasionally return one empty list, quick_xplain will enter an infinite loop
        :param C: (List[Block]) the list of constraint's blocks
        :param choice: (int) 1 to "isolate" the first block
                              2 to split "by half"
                              3 to split in two and balance the size of both parts
        :return: C_1, C_2: (List[Block]) two lists of constraint's blocks
        """
        if choice == 1:
            return [C[0]], C[1:]
        if choice == 2:
            k = int(len(C) / 2)
            return C[:k], C[k:]
        if choice == 3:
            nb_blocs = len(C)
            if nb_blocs < 2:
                return self.split(C, 2)

            mes_tailles = [b.size() for b in C]
            max_taille = sum(x for x in mes_tailles)

            """On fait un mini problème d'optimisation pour découper notre problème en deux parties égales"""
            model = cp_model.CpModel()
            solver2 = cp_model.CpSolver()
            solver2.parameters.max_time_in_seconds = 60
            affectations = {j: model.NewBoolVar("affect_{}".format(j)) for j in range(nb_blocs)}
            ecart_optimal = model.NewIntVar(0, max_taille, 'bloc_min')
            model.Add((2 * sum([affectations[j] * mes_tailles[j] for j in range(nb_blocs)]) - max_taille) <= ecart_optimal)
            model.Add((max_taille - 2 * sum([affectations[j] * mes_tailles[j] for j in range(nb_blocs)])) <= ecart_optimal)
            model.Minimize(ecart_optimal)
            res = solver2.Solve(model)

            if res == cp_model.OPTIMAL:
                bloc_1 = [C[j] for j in range(nb_blocs) if solver2.Value(affectations[j])]
                bloc_2 = [C[j] for j in range(nb_blocs) if not solver2.Value(affectations[j])]
                if len(bloc_2) == 0 or len(bloc_1) == 0:
                    return self.split(C, 2)
                return bloc_1, bloc_2
            else:
                return self.split(C, 2)

    def add_var_assumptions(self, all_blocks):
        """
        Function that adds assumption variables
        """
        for block in all_blocks:
            if block.is_true_constraint():
                nom_block = block.name()
                block_constraints = block.contenu()

                if block not in self.block_to_index_var_assumption:
                    b_var = self.model.NewBoolVar(nom_block)
                    b_var_ix = b_var.Index()
                    self.variables_ajoutees.append(b_var_ix)
                    self.index_var_assumption_to_block[b_var.Index()] = block
                    self.block_to_index_var_assumption[block] = b_var.Index()
                    self.model.Proto().assumptions.extend([b_var.Index()])

                for single_constraint in block_constraints:
                    if isinstance(single_constraint, UsualConstraint):
                        list_idx = single_constraint.list_idx()
                        for idx in list_idx:
                            if idx in self.list_idx_usual_constraints_with_assumptions:
                                logger.error("Trying to assign an assumption variable to a constraint which already has one")
                                raise IndexError

                            if idx in self.dict_indexing.get_list_usual_inamovible() or self.constraints_relaxation.is_relaxed_usual_constraint[single_constraint]:
                                continue

                            index_b_var = self.block_to_index_var_assumption[block]
                            self.model.Proto().constraints[idx].enforcement_literal.extend([index_b_var])
                            self.list_idx_usual_constraints_with_assumptions.append(idx)

    def inv_add_var_assumptions(self):
        """Function that removes ALL assumption variables"""
        # On enlève les variables assumptions présentes devant les contraintes
        for idx_constraints in self.list_idx_usual_constraints_with_assumptions:
            self.model.Proto().constraints[idx_constraints].enforcement_literal.pop(0)

        self.model.Proto().assumptions[:] = []

        # On supprime les variables d'assumption
        for ind in sorted(self.variables_ajoutees, reverse=True):
            self.model.Proto().variables.pop(ind)

        # On remet à jour notre "mémoire"
        self.index_var_assumption_to_block = dict()
        self.block_to_index_var_assumption = dict()
        self.list_idx_usual_constraints_with_assumptions = []
        self.variables_ajoutees = []

    def sufficient_assumption(self, all_blocks) -> Union[List[NamedBlock], None]:
        """
        Function that returns an iis with the Or-tools function : "sufficient_assumptions_for_infeasibility"
        :return: explanations: (List[Block]) a list of blocks corresponding to an IIS
        """
        logger.info("Launching sufficient_assumption")
        self.add_var_assumptions(all_blocks)
        self.solver.parameters.num_search_workers = 1

        # On vérifie qu'on a encore au moins un conflit
        status = is_consistent(self.model, self.solver)
        if status:
            self.inv_add_var_assumptions()
            return None
        else:
            assumptions = self.solver.ResponseProto().sufficient_assumptions_for_infeasibility
            explanations = set(self.index_var_assumption_to_block[index_b_var] for index_b_var in assumptions)
            self.inv_add_var_assumptions()
            return list(explanations)

    def quick_xplain(self, all_blocks) -> Union[List[NamedBlock], None]:
        """
        Function that implements the QuickXplain's algorithm
        :return: result: (List[Block]) a list of blocks corresponding to an IIS
        """
        logger.info("Launching quick_xplain")

        # list_of_usual_constraints_already_released = copy(self.constraints_relaxation.is_relaxed_usual_constraint)
        # list_of_constant_constraints_already_released = copy(self.constraints_relaxation.is_relaxed_constant)

        list_of_usual_constraints_already_released = {x: True for x in self.constraints_relaxation.is_relaxed_usual_constraint if self.constraints_relaxation.is_relaxed_usual_constraint[x]}
        list_of_constant_constraints_already_released = {x: True for x in self.constraints_relaxation.is_relaxed_constant if self.constraints_relaxation.is_relaxed_constant[x]}

        logger.debug('Relaxed constraints :')
        logger.debug(list(list_of_usual_constraints_already_released.keys()) + ['c_{}'.format(cons) for cons in list_of_constant_constraints_already_released.keys()])

        def QX(delta_bool: bool, remaining_constraints: List[NamedBlock], i_rec=1) -> List[NamedBlock]:
            """
            Core of the recursive function of QuickXplain's algorithm
            :param delta_bool: (bool) True if a constraint has just been activated, or False
            :param remaining_constraints: (List[Block]) remaining constraints for IIS research
            :param i_rec: (int) an iterator
            :return: delta_1 + delta_2: (List[Block]) a list of blocks corresponding to an IIS
            """
            logger.debug("\t" * i_rec + "QX: remaining constraints %s" % remaining_constraints)

            if delta_bool and not is_consistent(self.model, self.solver):
                logger.debug("\t" * i_rec + "QX: infeasible")
                return []
            if len(remaining_constraints) == 1:
                logger.debug("\t" * i_rec + "QX: only one remaining constraint")
                return remaining_constraints

            logger.debug("\t" * i_rec + "QX: feasible")

            C_1, C_2 = self.split(remaining_constraints, choice=self.split_choice)
            logger.debug("\t" * i_rec + "C1: %s, C2: %s" % (C_1, C_2))

            self.constraints_relaxation.inv_relax_list_blocks(C_1, list_usual_idx_not_to_release=list_of_usual_constraints_already_released,
                                                              list_constant_idx_not_to_release=list_of_constant_constraints_already_released)
            logger.debug("\t"*i_rec + "QX: Activate C1 : %s" % C_1)
            logger.debug("\t"*i_rec + "QX: Searches in C_2 after having reactivated C_1 : %s" % C_1)
            delta_2 = QX(delta_bool=True, remaining_constraints=C_2, i_rec=i_rec + 1)
            logger.debug("\t"*i_rec + "QX: Delta_2 : %s" % delta_2)
            self.constraints_relaxation.relax_list_blocks(C_1)
            logger.debug("\t"*i_rec + "QX: Deactivate C1 : %s" % C_1)

            if delta_2 and len(delta_2) > 0:
                logger.debug("\t"*i_rec + "QX: Activate Delta_2 : %s" % delta_2)
                self.constraints_relaxation.inv_relax_list_blocks(delta_2, list_usual_idx_not_to_release=list_of_usual_constraints_already_released,
                                                                  list_constant_idx_not_to_release=list_of_constant_constraints_already_released)
                logger.debug("\t"*i_rec + "QX: Searches in C_1 after having activated D2")
                delta_1 = QX(delta_bool=True, remaining_constraints=C_1, i_rec=i_rec + 1)
                logger.debug("\t" * i_rec + "QX: Delta_1 : %s" % delta_1)

                logger.debug("\t"*i_rec + "QX: Deactivate Delta_2 : %s" % delta_2)
                self.constraints_relaxation.relax_list_blocks(delta_2)
            else:
                logger.debug("\t"*i_rec + "QX: Searches in C_1")
                delta_1 = QX(delta_bool=False, remaining_constraints=C_1, i_rec=i_rec + 1)
                logger.debug("\t" * i_rec + "QX: Delta_1 : %s" % delta_1)

            logger.debug("\t"*i_rec + "QX: Delta_1 + Delta_2 : %s" % (delta_1 + delta_2))

            return delta_1 + delta_2

        if is_consistent(self.model, self.solver):
            logger.error('Quick Xplain was called but model is feasible')
            return None

        # all_blocks = [block for block in self.dict_indexing.splits_on_granularity()]

        logger.debug('List of blocks:')
        logger.debug(all_blocks)

        if len(all_blocks) == 0:
            return []

        logger.debug("QX: Deactivates all constraints: %s" % all_blocks)
        self.constraints_relaxation.relax_list_blocks(all_blocks)
        result = QX(delta_bool=True, remaining_constraints=all_blocks)
        logger.debug("QX: Reactivates all constraints: %s" % all_blocks)
        self.constraints_relaxation.inv_relax_list_blocks(all_blocks,
                                                          list_usual_idx_not_to_release=list_of_usual_constraints_already_released,
                                                          list_constant_idx_not_to_release=list_of_constant_constraints_already_released)
        return result

    def find_iis_single_call(self, method: str, all_blocks) -> Union[List[NamedBlock], None]:
        """
        Function that searches for an IIS according to the chosen method (quick_xplain or sufficient_assumption)
        :return: result: (List[NamedBlock]) a list of blocks corresponding to an IIS
        """
        result = None
        if method == "quick_xplain":
            result = self.quick_xplain(all_blocks)
            logger.info("End of quick_xplain")
        elif method == "sufficient_assumption":
            result = self.sufficient_assumption(all_blocks)
            logger.info("End of sufficient_assumption")

        return sorted(result)

Classes

class NegativeExplanation (model, split_choice=3, nb_iis_granularite=1)

Class for generating the explanation of an unsolvable model.

Expand source code
class NegativeExplanation:
    """
    Class for generating the explanation of an unsolvable model.
    """

    def __init__(
            self,
            model,
            split_choice=3,  # Argument explaining how we split the set of constraints in two
            nb_iis_granularite=1
    ):
        self.model = model
        self.dict_indexing = model.dict_indexing()
        self.nb_iis_granularite = nb_iis_granularite

        self.constraints_relaxation = ConstraintsRelaxation(model)
        self.solver = cp_model.CpSolver()

        self.index_var_assumption_to_block: Dict[int, NamedBlock] = dict()
        self.block_to_index_var_assumption: Dict[NamedBlock, int] = dict()
        self.list_idx_usual_constraints_with_assumptions = []
        self.list_idx_constant_constraints_with_assumptions = []
        self.variables_ajoutees = []

        self.list_already_found_iis = []

        self.split_choice = split_choice

    def is_infeasible(self) -> bool:
        """Before searching for IIS, checks once and for all that the model is not feasible"""
        self.model.Proto().ClearField("objective")
        return not is_consistent(self.model, self.solver)

    def find_one_iis_combined(self) -> [NamedBlock]:
        """Launches QuickXplain and Sufficient Assumption on two separate threads
        Returns the first result"""

        def thread_qx():
            logger.debug('Launching QuickXplain')
            res = self.find_one_iis_auto_zoom_quick_xplain()
            logger.debug('QuickXplain is done')
            return res

        def thread_sa():
            logger.debug('Launching Sufficient Assumption')
            res = self.find_one_iis_sufficient_assumption()
            logger.debug('Sufficient Assumption is done')
            return res

        class CustomProcess(Process):
            def __init__(self, tache, queue_de_retour):
                Process.__init__(self)
                self.tache = tache
                self.queue_de_retour = queue_de_retour

            def run(self):
                resultat = self.tache()
                self.queue_de_retour.put(resultat)

        queue_pour_le_resultat = Queue()
        liste_de_process = [CustomProcess(tache, queue_pour_le_resultat) for tache in [thread_qx, thread_sa]]

        for idx, process in enumerate(liste_de_process):
            process.start()

        resultat = queue_pour_le_resultat.get()

        for process in liste_de_process:
            process.terminate()

        return resultat

    def find_one_iis_sufficient_assumption(self) -> [NamedBlock]:
        """
        Finds one IIS with the Sufficient Assumption way
        Also uses a little Quick Xplain because SA does not handle constant constraints
        """
        if not self.is_infeasible():
            logger.info('Problem is solvable')
            return None

        mon_iis_uc = self._find_one_iis_and_zoom('sufficient_assumption')
        if not mon_iis_uc:
            mon_iis_uc = []

        logger.info('IIS found with sufficient assumption :')
        logger.info(mon_iis_uc)
        logger.info('Reducing the IIS with quick xplain... ')

        self.constraints_relaxation.relax_all_real_constraints_but(mon_iis_uc)

        # Finds one IIS with sufficient assumption
        mes_blocks = mon_iis_uc + [b for b in self.dict_indexing.return_all_blocks_as_named() if not b.is_true_constraint()]
        mon_iis_complet = self._find_one_iis_and_zoom('quick_xplain', mes_blocks)

        logger.info('Final IIS:')
        logger.info(mon_iis_complet)

        return mon_iis_complet

    def _find_one_iis_and_zoom(self, method: str, all_blocks=None) -> [NamedBlock]:
        """
        Finds one IIS on self.model (some constraints may be released)
        Zooms automatically to return an IIS with the most appropriate granularity
        At the end of this function, relaxed blocks are exactly the same as in the beginning
        """
        if not all_blocks:
            all_blocks = self.dict_indexing.return_all_blocks_as_named()

        # Granularité initiale
        current_iis = self.find_iis_single_call(method, all_blocks)  # Liste de NamedBlock
        if len(current_iis) == 0:
            logger.info('There is a problem within the background blocks')
            return []

        # On relache une fois pour toutes ce qui n'est pas dans l'IIS
        self.constraints_relaxation.relax_all_but(current_iis, method)

        logger.info('Large IIS: ')
        logger.info(current_iis)

        while True:
            longueur_iis = len(current_iis)
            nouv_iis = None
            new_blocks = None
            for nb in current_iis:
                while nb.can_be_split() and not new_blocks:
                    list_nb = nb.split_on_next_dimension()
                    new_blocks = list_nb + [b for b in current_iis if b != nb]
                    logger.debug('New blocks : {}'.format(new_blocks))
                    nouv_iis = self.find_iis_single_call(method, new_blocks)
                    logger.debug('New-scale IIS : {}'.format(nouv_iis))
                    if len(nouv_iis) > longueur_iis + self.nb_iis_granularite - 1:
                        logger.debug('This dimension is not relevant')
                        nb.remove_dim_to_split()
                        nouv_iis = None
                        new_blocks = None
            if not nouv_iis:
                break
            else:
                current_iis = nouv_iis
                self.constraints_relaxation.relax_all_but(current_iis, method)
                logger.info('Current IIS: ')
                logger.info(current_iis)

        logger.info('Final IIS:')
        logger.info(current_iis)
        self.constraints_relaxation.inv_relax_all()
        return current_iis

    def find_one_iis_auto_zoom_quick_xplain(self) -> [NamedBlock]:
        """
        Finds one IIS by zooming automatically with QX method
        Starts by finding MacroBlocks that are in conflict and then zooms in progressively
        """
        if not self.is_infeasible():
            logger.info('Problem is feasible')
            return None

        return self._find_one_iis_and_zoom('quick_xplain')

    def find_set_of_distinct_iis_combined(self) -> [[NamedBlock]]:
        """Launches QuickXplain and Sufficient Assumption on two separate threads
        Returns the first result"""

        def thread_qx():
            logger.debug('Lauching QuickXplain')
            res = self.find_set_of_distinct_iis_qx()
            logger.debug('QuickXplain is done')
            return res

        def thread_sa():
            logger.debug('Launching Sufficient Assumption')
            res = self.find_set_of_distinct_iis_sa()
            logger.debug('Sufficient Assumption is done')
            return res

        class CustomProcess(Process):
            def __init__(self, tache, queue_de_retour):
                Process.__init__(self)
                self.tache = tache
                self.queue_de_retour = queue_de_retour

            def run(self):
                resultat = self.tache()
                self.queue_de_retour.put(resultat)

        queue_pour_le_resultat = Queue()
        liste_de_process = [CustomProcess(tache, queue_pour_le_resultat) for tache in [thread_qx, thread_sa]]

        for idx, process in enumerate(liste_de_process):
            process.start()

        resultat = queue_pour_le_resultat.get()

        for process in liste_de_process:
            process.terminate()

        return resultat

    def find_set_of_distinct_iis_qx(self) -> [[NamedBlock]]:
        """
        Returns a set of IIS that do not intersect each other, with QX method
        """
        if not self.is_infeasible():
            return None

        mes_iis = []
        self.list_already_found_iis = []
        on_continue = True

        while on_continue:
            logger.info('Searching for a new IIS... ')

            # On relache les contraintes des IIS déjà trouvés
            for iis in mes_iis:
                self.constraints_relaxation.relax_list_blocks(iis)

            # On vérifie qu'on a encore au moins un conflit
            status = is_consistent(self.model, self.solver)

            if status:
                logger.info('No more IIS')
                on_continue = False
            else:
                nouvel_iis = self._find_one_iis_and_zoom('quick_xplain')
                if len(nouvel_iis) == 0:
                    on_continue = False
                else:
                    mes_iis.append(nouvel_iis)
                    self.list_already_found_iis.append(nouvel_iis)

        self.constraints_relaxation.inv_relax_all()
        return mes_iis

    def find_set_of_distinct_iis_sa(self) -> [[NamedBlock]]:
        """
        Returns a set of IIS that do not intersect each other, with SA method
        """
        if not self.is_infeasible():
            return None

        mes_iis = []
        self.list_already_found_iis = []
        on_continue = True

        while on_continue:
            logger.info('Searching for a new IIS... ')

            # On relache les contraintes des IIS déjà trouvés
            for iis in mes_iis:
                self.constraints_relaxation.relax_list_blocks(iis)

            # On vérifie qu'on a encore au moins un conflit
            status = is_consistent(self.model, self.solver)

            if status:
                logger.info('No more IIS')
                on_continue = False
            else:
                mon_iis_uc = self._find_one_iis_and_zoom('sufficient_assumption')

                # On relache les contraintes des IIS déjà trouvés
                for iis in mes_iis:
                    self.constraints_relaxation.relax_list_blocks(iis)

                logger.debug('IIS found with sufficient assumption:')
                logger.debug(mon_iis_uc)

                if not mon_iis_uc:
                    mon_iis_uc = []

                self.constraints_relaxation.relax_all_real_constraints_but(mon_iis_uc)

                # Finds one IIS with sufficient assumption
                mes_blocks = mon_iis_uc + [b for b in self.dict_indexing.return_all_blocks_as_named() if not b.is_true_constraint()]
                mon_iis_complet = self._find_one_iis_and_zoom('quick_xplain', mes_blocks)

                logger.debug('Final IIS:')
                logger.debug(mon_iis_complet)

                if len(mon_iis_complet) == 0:
                    on_continue = False
                else:
                    mes_iis.append(mon_iis_complet)
                    self.list_already_found_iis.append(mon_iis_complet)

        self.constraints_relaxation.inv_relax_all()
        return mes_iis

    def split(self, C: List[NamedBlock], choice: int):
        # Attention à la méthode de découpage
        # S'il n'y a pas au moins un block dans chaque partie on va boucler à l'infini
        """
        Function that split the list C of constraint's blocks into two list of constraint's blocks
        Beware if implementing new splitting methods. If the method occasionally return one empty list, quick_xplain will enter an infinite loop
        :param C: (List[Block]) the list of constraint's blocks
        :param choice: (int) 1 to "isolate" the first block
                              2 to split "by half"
                              3 to split in two and balance the size of both parts
        :return: C_1, C_2: (List[Block]) two lists of constraint's blocks
        """
        if choice == 1:
            return [C[0]], C[1:]
        if choice == 2:
            k = int(len(C) / 2)
            return C[:k], C[k:]
        if choice == 3:
            nb_blocs = len(C)
            if nb_blocs < 2:
                return self.split(C, 2)

            mes_tailles = [b.size() for b in C]
            max_taille = sum(x for x in mes_tailles)

            """On fait un mini problème d'optimisation pour découper notre problème en deux parties égales"""
            model = cp_model.CpModel()
            solver2 = cp_model.CpSolver()
            solver2.parameters.max_time_in_seconds = 60
            affectations = {j: model.NewBoolVar("affect_{}".format(j)) for j in range(nb_blocs)}
            ecart_optimal = model.NewIntVar(0, max_taille, 'bloc_min')
            model.Add((2 * sum([affectations[j] * mes_tailles[j] for j in range(nb_blocs)]) - max_taille) <= ecart_optimal)
            model.Add((max_taille - 2 * sum([affectations[j] * mes_tailles[j] for j in range(nb_blocs)])) <= ecart_optimal)
            model.Minimize(ecart_optimal)
            res = solver2.Solve(model)

            if res == cp_model.OPTIMAL:
                bloc_1 = [C[j] for j in range(nb_blocs) if solver2.Value(affectations[j])]
                bloc_2 = [C[j] for j in range(nb_blocs) if not solver2.Value(affectations[j])]
                if len(bloc_2) == 0 or len(bloc_1) == 0:
                    return self.split(C, 2)
                return bloc_1, bloc_2
            else:
                return self.split(C, 2)

    def add_var_assumptions(self, all_blocks):
        """
        Function that adds assumption variables
        """
        for block in all_blocks:
            if block.is_true_constraint():
                nom_block = block.name()
                block_constraints = block.contenu()

                if block not in self.block_to_index_var_assumption:
                    b_var = self.model.NewBoolVar(nom_block)
                    b_var_ix = b_var.Index()
                    self.variables_ajoutees.append(b_var_ix)
                    self.index_var_assumption_to_block[b_var.Index()] = block
                    self.block_to_index_var_assumption[block] = b_var.Index()
                    self.model.Proto().assumptions.extend([b_var.Index()])

                for single_constraint in block_constraints:
                    if isinstance(single_constraint, UsualConstraint):
                        list_idx = single_constraint.list_idx()
                        for idx in list_idx:
                            if idx in self.list_idx_usual_constraints_with_assumptions:
                                logger.error("Trying to assign an assumption variable to a constraint which already has one")
                                raise IndexError

                            if idx in self.dict_indexing.get_list_usual_inamovible() or self.constraints_relaxation.is_relaxed_usual_constraint[single_constraint]:
                                continue

                            index_b_var = self.block_to_index_var_assumption[block]
                            self.model.Proto().constraints[idx].enforcement_literal.extend([index_b_var])
                            self.list_idx_usual_constraints_with_assumptions.append(idx)

    def inv_add_var_assumptions(self):
        """Function that removes ALL assumption variables"""
        # On enlève les variables assumptions présentes devant les contraintes
        for idx_constraints in self.list_idx_usual_constraints_with_assumptions:
            self.model.Proto().constraints[idx_constraints].enforcement_literal.pop(0)

        self.model.Proto().assumptions[:] = []

        # On supprime les variables d'assumption
        for ind in sorted(self.variables_ajoutees, reverse=True):
            self.model.Proto().variables.pop(ind)

        # On remet à jour notre "mémoire"
        self.index_var_assumption_to_block = dict()
        self.block_to_index_var_assumption = dict()
        self.list_idx_usual_constraints_with_assumptions = []
        self.variables_ajoutees = []

    def sufficient_assumption(self, all_blocks) -> Union[List[NamedBlock], None]:
        """
        Function that returns an iis with the Or-tools function : "sufficient_assumptions_for_infeasibility"
        :return: explanations: (List[Block]) a list of blocks corresponding to an IIS
        """
        logger.info("Launching sufficient_assumption")
        self.add_var_assumptions(all_blocks)
        self.solver.parameters.num_search_workers = 1

        # On vérifie qu'on a encore au moins un conflit
        status = is_consistent(self.model, self.solver)
        if status:
            self.inv_add_var_assumptions()
            return None
        else:
            assumptions = self.solver.ResponseProto().sufficient_assumptions_for_infeasibility
            explanations = set(self.index_var_assumption_to_block[index_b_var] for index_b_var in assumptions)
            self.inv_add_var_assumptions()
            return list(explanations)

    def quick_xplain(self, all_blocks) -> Union[List[NamedBlock], None]:
        """
        Function that implements the QuickXplain's algorithm
        :return: result: (List[Block]) a list of blocks corresponding to an IIS
        """
        logger.info("Launching quick_xplain")

        # list_of_usual_constraints_already_released = copy(self.constraints_relaxation.is_relaxed_usual_constraint)
        # list_of_constant_constraints_already_released = copy(self.constraints_relaxation.is_relaxed_constant)

        list_of_usual_constraints_already_released = {x: True for x in self.constraints_relaxation.is_relaxed_usual_constraint if self.constraints_relaxation.is_relaxed_usual_constraint[x]}
        list_of_constant_constraints_already_released = {x: True for x in self.constraints_relaxation.is_relaxed_constant if self.constraints_relaxation.is_relaxed_constant[x]}

        logger.debug('Relaxed constraints :')
        logger.debug(list(list_of_usual_constraints_already_released.keys()) + ['c_{}'.format(cons) for cons in list_of_constant_constraints_already_released.keys()])

        def QX(delta_bool: bool, remaining_constraints: List[NamedBlock], i_rec=1) -> List[NamedBlock]:
            """
            Core of the recursive function of QuickXplain's algorithm
            :param delta_bool: (bool) True if a constraint has just been activated, or False
            :param remaining_constraints: (List[Block]) remaining constraints for IIS research
            :param i_rec: (int) an iterator
            :return: delta_1 + delta_2: (List[Block]) a list of blocks corresponding to an IIS
            """
            logger.debug("\t" * i_rec + "QX: remaining constraints %s" % remaining_constraints)

            if delta_bool and not is_consistent(self.model, self.solver):
                logger.debug("\t" * i_rec + "QX: infeasible")
                return []
            if len(remaining_constraints) == 1:
                logger.debug("\t" * i_rec + "QX: only one remaining constraint")
                return remaining_constraints

            logger.debug("\t" * i_rec + "QX: feasible")

            C_1, C_2 = self.split(remaining_constraints, choice=self.split_choice)
            logger.debug("\t" * i_rec + "C1: %s, C2: %s" % (C_1, C_2))

            self.constraints_relaxation.inv_relax_list_blocks(C_1, list_usual_idx_not_to_release=list_of_usual_constraints_already_released,
                                                              list_constant_idx_not_to_release=list_of_constant_constraints_already_released)
            logger.debug("\t"*i_rec + "QX: Activate C1 : %s" % C_1)
            logger.debug("\t"*i_rec + "QX: Searches in C_2 after having reactivated C_1 : %s" % C_1)
            delta_2 = QX(delta_bool=True, remaining_constraints=C_2, i_rec=i_rec + 1)
            logger.debug("\t"*i_rec + "QX: Delta_2 : %s" % delta_2)
            self.constraints_relaxation.relax_list_blocks(C_1)
            logger.debug("\t"*i_rec + "QX: Deactivate C1 : %s" % C_1)

            if delta_2 and len(delta_2) > 0:
                logger.debug("\t"*i_rec + "QX: Activate Delta_2 : %s" % delta_2)
                self.constraints_relaxation.inv_relax_list_blocks(delta_2, list_usual_idx_not_to_release=list_of_usual_constraints_already_released,
                                                                  list_constant_idx_not_to_release=list_of_constant_constraints_already_released)
                logger.debug("\t"*i_rec + "QX: Searches in C_1 after having activated D2")
                delta_1 = QX(delta_bool=True, remaining_constraints=C_1, i_rec=i_rec + 1)
                logger.debug("\t" * i_rec + "QX: Delta_1 : %s" % delta_1)

                logger.debug("\t"*i_rec + "QX: Deactivate Delta_2 : %s" % delta_2)
                self.constraints_relaxation.relax_list_blocks(delta_2)
            else:
                logger.debug("\t"*i_rec + "QX: Searches in C_1")
                delta_1 = QX(delta_bool=False, remaining_constraints=C_1, i_rec=i_rec + 1)
                logger.debug("\t" * i_rec + "QX: Delta_1 : %s" % delta_1)

            logger.debug("\t"*i_rec + "QX: Delta_1 + Delta_2 : %s" % (delta_1 + delta_2))

            return delta_1 + delta_2

        if is_consistent(self.model, self.solver):
            logger.error('Quick Xplain was called but model is feasible')
            return None

        # all_blocks = [block for block in self.dict_indexing.splits_on_granularity()]

        logger.debug('List of blocks:')
        logger.debug(all_blocks)

        if len(all_blocks) == 0:
            return []

        logger.debug("QX: Deactivates all constraints: %s" % all_blocks)
        self.constraints_relaxation.relax_list_blocks(all_blocks)
        result = QX(delta_bool=True, remaining_constraints=all_blocks)
        logger.debug("QX: Reactivates all constraints: %s" % all_blocks)
        self.constraints_relaxation.inv_relax_list_blocks(all_blocks,
                                                          list_usual_idx_not_to_release=list_of_usual_constraints_already_released,
                                                          list_constant_idx_not_to_release=list_of_constant_constraints_already_released)
        return result

    def find_iis_single_call(self, method: str, all_blocks) -> Union[List[NamedBlock], None]:
        """
        Function that searches for an IIS according to the chosen method (quick_xplain or sufficient_assumption)
        :return: result: (List[NamedBlock]) a list of blocks corresponding to an IIS
        """
        result = None
        if method == "quick_xplain":
            result = self.quick_xplain(all_blocks)
            logger.info("End of quick_xplain")
        elif method == "sufficient_assumption":
            result = self.sufficient_assumption(all_blocks)
            logger.info("End of sufficient_assumption")

        return sorted(result)

Methods

def add_var_assumptions(self, all_blocks)

Function that adds assumption variables

Expand source code
def add_var_assumptions(self, all_blocks):
    """
    Function that adds assumption variables
    """
    for block in all_blocks:
        if block.is_true_constraint():
            nom_block = block.name()
            block_constraints = block.contenu()

            if block not in self.block_to_index_var_assumption:
                b_var = self.model.NewBoolVar(nom_block)
                b_var_ix = b_var.Index()
                self.variables_ajoutees.append(b_var_ix)
                self.index_var_assumption_to_block[b_var.Index()] = block
                self.block_to_index_var_assumption[block] = b_var.Index()
                self.model.Proto().assumptions.extend([b_var.Index()])

            for single_constraint in block_constraints:
                if isinstance(single_constraint, UsualConstraint):
                    list_idx = single_constraint.list_idx()
                    for idx in list_idx:
                        if idx in self.list_idx_usual_constraints_with_assumptions:
                            logger.error("Trying to assign an assumption variable to a constraint which already has one")
                            raise IndexError

                        if idx in self.dict_indexing.get_list_usual_inamovible() or self.constraints_relaxation.is_relaxed_usual_constraint[single_constraint]:
                            continue

                        index_b_var = self.block_to_index_var_assumption[block]
                        self.model.Proto().constraints[idx].enforcement_literal.extend([index_b_var])
                        self.list_idx_usual_constraints_with_assumptions.append(idx)
def find_iis_single_call(self, method: str, all_blocks) ‑> Optional[List[NamedBlock]]

Function that searches for an IIS according to the chosen method (quick_xplain or sufficient_assumption) :return: result: (List[NamedBlock]) a list of blocks corresponding to an IIS

Expand source code
def find_iis_single_call(self, method: str, all_blocks) -> Union[List[NamedBlock], None]:
    """
    Function that searches for an IIS according to the chosen method (quick_xplain or sufficient_assumption)
    :return: result: (List[NamedBlock]) a list of blocks corresponding to an IIS
    """
    result = None
    if method == "quick_xplain":
        result = self.quick_xplain(all_blocks)
        logger.info("End of quick_xplain")
    elif method == "sufficient_assumption":
        result = self.sufficient_assumption(all_blocks)
        logger.info("End of sufficient_assumption")

    return sorted(result)
def find_one_iis_auto_zoom_quick_xplain(self) ‑> [NamedBlock'>]

Finds one IIS by zooming automatically with QX method Starts by finding MacroBlocks that are in conflict and then zooms in progressively

Expand source code
def find_one_iis_auto_zoom_quick_xplain(self) -> [NamedBlock]:
    """
    Finds one IIS by zooming automatically with QX method
    Starts by finding MacroBlocks that are in conflict and then zooms in progressively
    """
    if not self.is_infeasible():
        logger.info('Problem is feasible')
        return None

    return self._find_one_iis_and_zoom('quick_xplain')
def find_one_iis_combined(self) ‑> [NamedBlock'>]

Launches QuickXplain and Sufficient Assumption on two separate threads Returns the first result

Expand source code
def find_one_iis_combined(self) -> [NamedBlock]:
    """Launches QuickXplain and Sufficient Assumption on two separate threads
    Returns the first result"""

    def thread_qx():
        logger.debug('Launching QuickXplain')
        res = self.find_one_iis_auto_zoom_quick_xplain()
        logger.debug('QuickXplain is done')
        return res

    def thread_sa():
        logger.debug('Launching Sufficient Assumption')
        res = self.find_one_iis_sufficient_assumption()
        logger.debug('Sufficient Assumption is done')
        return res

    class CustomProcess(Process):
        def __init__(self, tache, queue_de_retour):
            Process.__init__(self)
            self.tache = tache
            self.queue_de_retour = queue_de_retour

        def run(self):
            resultat = self.tache()
            self.queue_de_retour.put(resultat)

    queue_pour_le_resultat = Queue()
    liste_de_process = [CustomProcess(tache, queue_pour_le_resultat) for tache in [thread_qx, thread_sa]]

    for idx, process in enumerate(liste_de_process):
        process.start()

    resultat = queue_pour_le_resultat.get()

    for process in liste_de_process:
        process.terminate()

    return resultat
def find_one_iis_sufficient_assumption(self) ‑> [NamedBlock'>]

Finds one IIS with the Sufficient Assumption way Also uses a little Quick Xplain because SA does not handle constant constraints

Expand source code
def find_one_iis_sufficient_assumption(self) -> [NamedBlock]:
    """
    Finds one IIS with the Sufficient Assumption way
    Also uses a little Quick Xplain because SA does not handle constant constraints
    """
    if not self.is_infeasible():
        logger.info('Problem is solvable')
        return None

    mon_iis_uc = self._find_one_iis_and_zoom('sufficient_assumption')
    if not mon_iis_uc:
        mon_iis_uc = []

    logger.info('IIS found with sufficient assumption :')
    logger.info(mon_iis_uc)
    logger.info('Reducing the IIS with quick xplain... ')

    self.constraints_relaxation.relax_all_real_constraints_but(mon_iis_uc)

    # Finds one IIS with sufficient assumption
    mes_blocks = mon_iis_uc + [b for b in self.dict_indexing.return_all_blocks_as_named() if not b.is_true_constraint()]
    mon_iis_complet = self._find_one_iis_and_zoom('quick_xplain', mes_blocks)

    logger.info('Final IIS:')
    logger.info(mon_iis_complet)

    return mon_iis_complet
def find_set_of_distinct_iis_combined(self) ‑> [[NamedBlock'>]]

Launches QuickXplain and Sufficient Assumption on two separate threads Returns the first result

Expand source code
def find_set_of_distinct_iis_combined(self) -> [[NamedBlock]]:
    """Launches QuickXplain and Sufficient Assumption on two separate threads
    Returns the first result"""

    def thread_qx():
        logger.debug('Lauching QuickXplain')
        res = self.find_set_of_distinct_iis_qx()
        logger.debug('QuickXplain is done')
        return res

    def thread_sa():
        logger.debug('Launching Sufficient Assumption')
        res = self.find_set_of_distinct_iis_sa()
        logger.debug('Sufficient Assumption is done')
        return res

    class CustomProcess(Process):
        def __init__(self, tache, queue_de_retour):
            Process.__init__(self)
            self.tache = tache
            self.queue_de_retour = queue_de_retour

        def run(self):
            resultat = self.tache()
            self.queue_de_retour.put(resultat)

    queue_pour_le_resultat = Queue()
    liste_de_process = [CustomProcess(tache, queue_pour_le_resultat) for tache in [thread_qx, thread_sa]]

    for idx, process in enumerate(liste_de_process):
        process.start()

    resultat = queue_pour_le_resultat.get()

    for process in liste_de_process:
        process.terminate()

    return resultat
def find_set_of_distinct_iis_qx(self) ‑> [[NamedBlock'>]]

Returns a set of IIS that do not intersect each other, with QX method

Expand source code
def find_set_of_distinct_iis_qx(self) -> [[NamedBlock]]:
    """
    Returns a set of IIS that do not intersect each other, with QX method
    """
    if not self.is_infeasible():
        return None

    mes_iis = []
    self.list_already_found_iis = []
    on_continue = True

    while on_continue:
        logger.info('Searching for a new IIS... ')

        # On relache les contraintes des IIS déjà trouvés
        for iis in mes_iis:
            self.constraints_relaxation.relax_list_blocks(iis)

        # On vérifie qu'on a encore au moins un conflit
        status = is_consistent(self.model, self.solver)

        if status:
            logger.info('No more IIS')
            on_continue = False
        else:
            nouvel_iis = self._find_one_iis_and_zoom('quick_xplain')
            if len(nouvel_iis) == 0:
                on_continue = False
            else:
                mes_iis.append(nouvel_iis)
                self.list_already_found_iis.append(nouvel_iis)

    self.constraints_relaxation.inv_relax_all()
    return mes_iis
def find_set_of_distinct_iis_sa(self) ‑> [[NamedBlock'>]]

Returns a set of IIS that do not intersect each other, with SA method

Expand source code
def find_set_of_distinct_iis_sa(self) -> [[NamedBlock]]:
    """
    Returns a set of IIS that do not intersect each other, with SA method
    """
    if not self.is_infeasible():
        return None

    mes_iis = []
    self.list_already_found_iis = []
    on_continue = True

    while on_continue:
        logger.info('Searching for a new IIS... ')

        # On relache les contraintes des IIS déjà trouvés
        for iis in mes_iis:
            self.constraints_relaxation.relax_list_blocks(iis)

        # On vérifie qu'on a encore au moins un conflit
        status = is_consistent(self.model, self.solver)

        if status:
            logger.info('No more IIS')
            on_continue = False
        else:
            mon_iis_uc = self._find_one_iis_and_zoom('sufficient_assumption')

            # On relache les contraintes des IIS déjà trouvés
            for iis in mes_iis:
                self.constraints_relaxation.relax_list_blocks(iis)

            logger.debug('IIS found with sufficient assumption:')
            logger.debug(mon_iis_uc)

            if not mon_iis_uc:
                mon_iis_uc = []

            self.constraints_relaxation.relax_all_real_constraints_but(mon_iis_uc)

            # Finds one IIS with sufficient assumption
            mes_blocks = mon_iis_uc + [b for b in self.dict_indexing.return_all_blocks_as_named() if not b.is_true_constraint()]
            mon_iis_complet = self._find_one_iis_and_zoom('quick_xplain', mes_blocks)

            logger.debug('Final IIS:')
            logger.debug(mon_iis_complet)

            if len(mon_iis_complet) == 0:
                on_continue = False
            else:
                mes_iis.append(mon_iis_complet)
                self.list_already_found_iis.append(mon_iis_complet)

    self.constraints_relaxation.inv_relax_all()
    return mes_iis
def inv_add_var_assumptions(self)

Function that removes ALL assumption variables

Expand source code
def inv_add_var_assumptions(self):
    """Function that removes ALL assumption variables"""
    # On enlève les variables assumptions présentes devant les contraintes
    for idx_constraints in self.list_idx_usual_constraints_with_assumptions:
        self.model.Proto().constraints[idx_constraints].enforcement_literal.pop(0)

    self.model.Proto().assumptions[:] = []

    # On supprime les variables d'assumption
    for ind in sorted(self.variables_ajoutees, reverse=True):
        self.model.Proto().variables.pop(ind)

    # On remet à jour notre "mémoire"
    self.index_var_assumption_to_block = dict()
    self.block_to_index_var_assumption = dict()
    self.list_idx_usual_constraints_with_assumptions = []
    self.variables_ajoutees = []
def is_infeasible(self) ‑> bool

Before searching for IIS, checks once and for all that the model is not feasible

Expand source code
def is_infeasible(self) -> bool:
    """Before searching for IIS, checks once and for all that the model is not feasible"""
    self.model.Proto().ClearField("objective")
    return not is_consistent(self.model, self.solver)
def quick_xplain(self, all_blocks) ‑> Optional[List[NamedBlock]]

Function that implements the QuickXplain's algorithm :return: result: (List[Block]) a list of blocks corresponding to an IIS

Expand source code
def quick_xplain(self, all_blocks) -> Union[List[NamedBlock], None]:
    """
    Function that implements the QuickXplain's algorithm
    :return: result: (List[Block]) a list of blocks corresponding to an IIS
    """
    logger.info("Launching quick_xplain")

    # list_of_usual_constraints_already_released = copy(self.constraints_relaxation.is_relaxed_usual_constraint)
    # list_of_constant_constraints_already_released = copy(self.constraints_relaxation.is_relaxed_constant)

    list_of_usual_constraints_already_released = {x: True for x in self.constraints_relaxation.is_relaxed_usual_constraint if self.constraints_relaxation.is_relaxed_usual_constraint[x]}
    list_of_constant_constraints_already_released = {x: True for x in self.constraints_relaxation.is_relaxed_constant if self.constraints_relaxation.is_relaxed_constant[x]}

    logger.debug('Relaxed constraints :')
    logger.debug(list(list_of_usual_constraints_already_released.keys()) + ['c_{}'.format(cons) for cons in list_of_constant_constraints_already_released.keys()])

    def QX(delta_bool: bool, remaining_constraints: List[NamedBlock], i_rec=1) -> List[NamedBlock]:
        """
        Core of the recursive function of QuickXplain's algorithm
        :param delta_bool: (bool) True if a constraint has just been activated, or False
        :param remaining_constraints: (List[Block]) remaining constraints for IIS research
        :param i_rec: (int) an iterator
        :return: delta_1 + delta_2: (List[Block]) a list of blocks corresponding to an IIS
        """
        logger.debug("\t" * i_rec + "QX: remaining constraints %s" % remaining_constraints)

        if delta_bool and not is_consistent(self.model, self.solver):
            logger.debug("\t" * i_rec + "QX: infeasible")
            return []
        if len(remaining_constraints) == 1:
            logger.debug("\t" * i_rec + "QX: only one remaining constraint")
            return remaining_constraints

        logger.debug("\t" * i_rec + "QX: feasible")

        C_1, C_2 = self.split(remaining_constraints, choice=self.split_choice)
        logger.debug("\t" * i_rec + "C1: %s, C2: %s" % (C_1, C_2))

        self.constraints_relaxation.inv_relax_list_blocks(C_1, list_usual_idx_not_to_release=list_of_usual_constraints_already_released,
                                                          list_constant_idx_not_to_release=list_of_constant_constraints_already_released)
        logger.debug("\t"*i_rec + "QX: Activate C1 : %s" % C_1)
        logger.debug("\t"*i_rec + "QX: Searches in C_2 after having reactivated C_1 : %s" % C_1)
        delta_2 = QX(delta_bool=True, remaining_constraints=C_2, i_rec=i_rec + 1)
        logger.debug("\t"*i_rec + "QX: Delta_2 : %s" % delta_2)
        self.constraints_relaxation.relax_list_blocks(C_1)
        logger.debug("\t"*i_rec + "QX: Deactivate C1 : %s" % C_1)

        if delta_2 and len(delta_2) > 0:
            logger.debug("\t"*i_rec + "QX: Activate Delta_2 : %s" % delta_2)
            self.constraints_relaxation.inv_relax_list_blocks(delta_2, list_usual_idx_not_to_release=list_of_usual_constraints_already_released,
                                                              list_constant_idx_not_to_release=list_of_constant_constraints_already_released)
            logger.debug("\t"*i_rec + "QX: Searches in C_1 after having activated D2")
            delta_1 = QX(delta_bool=True, remaining_constraints=C_1, i_rec=i_rec + 1)
            logger.debug("\t" * i_rec + "QX: Delta_1 : %s" % delta_1)

            logger.debug("\t"*i_rec + "QX: Deactivate Delta_2 : %s" % delta_2)
            self.constraints_relaxation.relax_list_blocks(delta_2)
        else:
            logger.debug("\t"*i_rec + "QX: Searches in C_1")
            delta_1 = QX(delta_bool=False, remaining_constraints=C_1, i_rec=i_rec + 1)
            logger.debug("\t" * i_rec + "QX: Delta_1 : %s" % delta_1)

        logger.debug("\t"*i_rec + "QX: Delta_1 + Delta_2 : %s" % (delta_1 + delta_2))

        return delta_1 + delta_2

    if is_consistent(self.model, self.solver):
        logger.error('Quick Xplain was called but model is feasible')
        return None

    # all_blocks = [block for block in self.dict_indexing.splits_on_granularity()]

    logger.debug('List of blocks:')
    logger.debug(all_blocks)

    if len(all_blocks) == 0:
        return []

    logger.debug("QX: Deactivates all constraints: %s" % all_blocks)
    self.constraints_relaxation.relax_list_blocks(all_blocks)
    result = QX(delta_bool=True, remaining_constraints=all_blocks)
    logger.debug("QX: Reactivates all constraints: %s" % all_blocks)
    self.constraints_relaxation.inv_relax_list_blocks(all_blocks,
                                                      list_usual_idx_not_to_release=list_of_usual_constraints_already_released,
                                                      list_constant_idx_not_to_release=list_of_constant_constraints_already_released)
    return result
def split(self, C: List[NamedBlock], choice: int)

Function that split the list C of constraint's blocks into two list of constraint's blocks Beware if implementing new splitting methods. If the method occasionally return one empty list, quick_xplain will enter an infinite loop :param C: (List[Block]) the list of constraint's blocks :param choice: (int) 1 to "isolate" the first block 2 to split "by half" 3 to split in two and balance the size of both parts :return: C_1, C_2: (List[Block]) two lists of constraint's blocks

Expand source code
def split(self, C: List[NamedBlock], choice: int):
    # Attention à la méthode de découpage
    # S'il n'y a pas au moins un block dans chaque partie on va boucler à l'infini
    """
    Function that split the list C of constraint's blocks into two list of constraint's blocks
    Beware if implementing new splitting methods. If the method occasionally return one empty list, quick_xplain will enter an infinite loop
    :param C: (List[Block]) the list of constraint's blocks
    :param choice: (int) 1 to "isolate" the first block
                          2 to split "by half"
                          3 to split in two and balance the size of both parts
    :return: C_1, C_2: (List[Block]) two lists of constraint's blocks
    """
    if choice == 1:
        return [C[0]], C[1:]
    if choice == 2:
        k = int(len(C) / 2)
        return C[:k], C[k:]
    if choice == 3:
        nb_blocs = len(C)
        if nb_blocs < 2:
            return self.split(C, 2)

        mes_tailles = [b.size() for b in C]
        max_taille = sum(x for x in mes_tailles)

        """On fait un mini problème d'optimisation pour découper notre problème en deux parties égales"""
        model = cp_model.CpModel()
        solver2 = cp_model.CpSolver()
        solver2.parameters.max_time_in_seconds = 60
        affectations = {j: model.NewBoolVar("affect_{}".format(j)) for j in range(nb_blocs)}
        ecart_optimal = model.NewIntVar(0, max_taille, 'bloc_min')
        model.Add((2 * sum([affectations[j] * mes_tailles[j] for j in range(nb_blocs)]) - max_taille) <= ecart_optimal)
        model.Add((max_taille - 2 * sum([affectations[j] * mes_tailles[j] for j in range(nb_blocs)])) <= ecart_optimal)
        model.Minimize(ecart_optimal)
        res = solver2.Solve(model)

        if res == cp_model.OPTIMAL:
            bloc_1 = [C[j] for j in range(nb_blocs) if solver2.Value(affectations[j])]
            bloc_2 = [C[j] for j in range(nb_blocs) if not solver2.Value(affectations[j])]
            if len(bloc_2) == 0 or len(bloc_1) == 0:
                return self.split(C, 2)
            return bloc_1, bloc_2
        else:
            return self.split(C, 2)
def sufficient_assumption(self, all_blocks) ‑> Optional[List[NamedBlock]]

Function that returns an iis with the Or-tools function : "sufficient_assumptions_for_infeasibility" :return: explanations: (List[Block]) a list of blocks corresponding to an IIS

Expand source code
def sufficient_assumption(self, all_blocks) -> Union[List[NamedBlock], None]:
    """
    Function that returns an iis with the Or-tools function : "sufficient_assumptions_for_infeasibility"
    :return: explanations: (List[Block]) a list of blocks corresponding to an IIS
    """
    logger.info("Launching sufficient_assumption")
    self.add_var_assumptions(all_blocks)
    self.solver.parameters.num_search_workers = 1

    # On vérifie qu'on a encore au moins un conflit
    status = is_consistent(self.model, self.solver)
    if status:
        self.inv_add_var_assumptions()
        return None
    else:
        assumptions = self.solver.ResponseProto().sufficient_assumptions_for_infeasibility
        explanations = set(self.index_var_assumption_to_block[index_b_var] for index_b_var in assumptions)
        self.inv_add_var_assumptions()
        return list(explanations)