import os
import select
import sys
import traceback
from collections import defaultdict
from importlib import import_module

from django.apps import apps
from django.core.exceptions import AppRegistryNotReady
from django.core.management import BaseCommand, CommandError
from django.utils.datastructures import OrderedSet
from django.utils.module_loading import import_string as import_dotted_path


class Command(BaseCommand):
    help = (
        "Runs a Python interactive interpreter. Tries to use IPython or "
        "bpython, if one of them is available. Any standard input is executed "
        "as code."
    )

    requires_system_checks = []
    shells = ["ipython", "bpython", "python"]

    def add_arguments(self, parser):
        parser.add_argument(
            "--no-startup",
            action="store_true",
            help=(
                "When using plain Python, ignore the PYTHONSTARTUP environment "
                "variable and ~/.pythonrc.py script."
            ),
        )
        parser.add_argument(
            "--no-imports",
            action="store_true",
            help="Disable automatic imports of models.",
        )
        parser.add_argument(
            "-i",
            "--interface",
            choices=self.shells,
            help=(
                "Specify an interactive interpreter interface. Available options: "
                '"ipython", "bpython", and "python"'
            ),
        )
        parser.add_argument(
            "-c",
            "--command",
            help=(
                "Instead of opening an interactive shell, run a command as Django and "
                "exit."
            ),
        )

    def ipython(self, options):
        from IPython import start_ipython

        start_ipython(argv=[], user_ns=self.get_namespace(**options))

    def bpython(self, options):
        import bpython

        bpython.embed(self.get_namespace(**options))

    def python(self, options):
        import code

        # Set up a dictionary to serve as the environment for the shell.
        imported_objects = self.get_namespace(**options)

        # We want to honor both $PYTHONSTARTUP and .pythonrc.py, so follow system
        # conventions and get $PYTHONSTARTUP first then .pythonrc.py.
        if not options["no_startup"]:
            for pythonrc in OrderedSet(
                [os.environ.get("PYTHONSTARTUP"), os.path.expanduser("~/.pythonrc.py")]
            ):
                if not pythonrc:
                    continue
                if not os.path.isfile(pythonrc):
                    continue
                with open(pythonrc) as handle:
                    pythonrc_code = handle.read()
                # Match the behavior of the cpython shell where an error in
                # PYTHONSTARTUP prints an exception and continues.
                try:
                    exec(compile(pythonrc_code, pythonrc, "exec"), imported_objects)
                except Exception:
                    traceback.print_exc()

        # By default, this will set up readline to do tab completion and to read and
        # write history to the .python_history file, but this can be overridden by
        # $PYTHONSTARTUP or ~/.pythonrc.py.
        try:
            hook = sys.__interactivehook__
        except AttributeError:
            # Match the behavior of the cpython shell where a missing
            # sys.__interactivehook__ is ignored.
            pass
        else:
            try:
                hook()
            except Exception:
                # Match the behavior of the cpython shell where an error in
                # sys.__interactivehook__ prints a warning and the exception
                # and continues.
                print("Failed calling sys.__interactivehook__")
                traceback.print_exc()

        # Set up tab completion for objects imported by $PYTHONSTARTUP or
        # ~/.pythonrc.py.
        try:
            import readline
            import rlcompleter

            readline.set_completer(rlcompleter.Completer(imported_objects).complete)
        except ImportError:
            pass

        # Start the interactive interpreter.
        code.interact(local=imported_objects)

    def get_auto_imports(self):
        """Return a sequence of import paths for objects to be auto-imported.

        By default, import paths for models in INSTALLED_APPS are included,
        with models from earlier apps taking precedence in case of a name
        collision.

        For example, for an unchanged INSTALLED_APPS, this method returns:

        [
            "django.contrib.sessions.models.Session",
            "django.contrib.contenttypes.models.ContentType",
            "django.contrib.auth.models.User",
            "django.contrib.auth.models.Group",
            "django.contrib.auth.models.Permission",
            "django.contrib.admin.models.LogEntry",
        ]

        """
        app_models_imports = [
            f"{model.__module__}.{model.__name__}"
            for model in reversed(apps.get_models())
            if model.__module__
        ]
        return app_models_imports

    def get_namespace(self, **options):
        if options and options.get("no_imports"):
            return {}

        verbosity = options["verbosity"] if options else 0

        try:
            apps.check_models_ready()
        except AppRegistryNotReady:
            if verbosity > 0:
                settings_env_var = os.getenv("DJANGO_SETTINGS_MODULE")
                self.stdout.write(
                    "Automatic imports are disabled since settings are not configured."
                    f"\nDJANGO_SETTINGS_MODULE value is {settings_env_var!r}.\n"
                    "HINT: Ensure that the settings module is configured and set.",
                    self.style.ERROR,
                    ending="\n\n",
                )
            return {}

        path_imports = self.get_auto_imports()
        if path_imports is None:
            return {}

        auto_imports = defaultdict(list)
        import_errors = []
        for path in path_imports:
            try:
                obj = import_dotted_path(path) if "." in path else import_module(path)
            except ImportError:
                import_errors.append(path)
                continue

            if "." in path:
                module, name = path.rsplit(".", 1)
            else:
                module = None
                name = path
            if (name, obj) not in auto_imports[module]:
                auto_imports[module].append((name, obj))

        namespace = {
            name: obj for items in auto_imports.values() for name, obj in items
        }

        if verbosity < 1:
            return namespace

        errors = len(import_errors)
        if errors:
            msg = "\n".join(f"  {e}" for e in import_errors)
            objects = "objects" if errors != 1 else "object"
            self.stdout.write(
                f"{errors} {objects} could not be automatically imported:\n\n{msg}",
                self.style.ERROR,
                ending="\n\n",
            )

        amount = len(namespace)
        objects_str = "objects" if amount != 1 else "object"
        msg = f"{amount} {objects_str} imported automatically"

        if verbosity < 2:
            if amount:
                msg += " (use -v 2 for details)"
            self.stdout.write(f"{msg}.", self.style.SUCCESS, ending="\n\n")
            return namespace

        top_level = auto_imports.pop(None, [])
        import_string = "\n".join(
            [f"  import {obj}" for obj, _ in top_level]
            + [
                f"  from {module} import {objects}"
                for module, imported_objects in auto_imports.items()
                if (objects := ", ".join(i[0] for i in imported_objects))
            ]
        )

        try:
            import isort
        except ImportError:
            pass
        else:
            import_string = isort.code(import_string)

        if import_string:
            msg = f"{msg}:\n\n{import_string}"
        else:
            msg = f"{msg}."

        self.stdout.write(msg, self.style.SUCCESS, ending="\n\n")

        return namespace

    def handle(self, **options):
        # Execute the command and exit.
        if options["command"]:
            exec(options["command"], {**globals(), **self.get_namespace(**options)})
            return

        # Execute stdin if it has anything to read and exit.
        # Not supported on Windows due to select.select() limitations.
        if (
            sys.platform != "win32"
            and not sys.stdin.isatty()
            and select.select([sys.stdin], [], [], 0)[0]
        ):
            exec(sys.stdin.read(), {**globals(), **self.get_namespace(**options)})
            return

        available_shells = (
            [options["interface"]] if options["interface"] else self.shells
        )

        for shell in available_shells:
            try:
                return getattr(self, shell)(options)
            except ImportError:
                pass
        raise CommandError("Couldn't import {} interface.".format(shell))
