Создание отдельного процесса, запуск задачи и получение результата выполнения.
Синтаксис:
import multiprocessing proc = multiprocessing.Process(group=None, target=None, name=None, args=(), kwargs=<>, *, daemon=None)
Параметры:
Конструктор всегда следует вызывать с ключевыми аргументами.
- group=None — не используется модулем,
- target=None — вызываемый объект (функция),
- name=None — имя процесса,
- args=() — аргументы для target ,
- kwargs=<> — ключевые аргументы для target ,
- daemon=None — флаг для демонизации процесса.
Возвращаемое значение:
Описание:
Класс Process() модуля multiprocessing запускает вызываемый объект target на исполнение, который будет выполняется в отдельном процессе/ядре процессора.
Есть два способа запустить какое либо действие:
- Передать вызываемый объект (функцию) target в конструктор.
- Переопределить метод Process.run() в подклассе.
Python. Урок 22. Потоки и процессы в Python. Часть 1. Управление потоками
Как устроен Python? ► Детальный разбор
Этот урок открывает цикл статей, посвященных параллельному программированию в Python. В рамках данного урока будут рассмотрены вопросы терминологии, относящиеся к параллельному программированию, GIL, создание и управление потоками в Python.
- Синхронность и асинхронность. Параллелизм в конкурентность
- Несколько слов о GIL
- Потоки в Python
- Создание и ожидание завершения работы потоков. Класс Thread
- Создание классов наследников от Thread
- Принудительное завершение работы потока
- Потоки-демоны
Синхронность и асинхронность. Параллелизм и конкурентность
Для начала разберемся с терминологией, которую мы будем использовать в рамках данного цикла статей, посвященного параллельному программированию на Python .
Синхронное выполнение программы подразумевает последовательное выполнение операций. Асинхронное – предполагает возможность независимого выполнения задач.
Приведем пример из математики, представьте, что у нас есть функция:
Для того, чтобы определить, чему равно значение функции при x=4, нам необходимо вначале вычислить выражение (x+1) и только потом, полученное значение возвести в квадрат:
Это пример синхронного порядка вычисления: операции были выполнены последовательно и, в данном случае, по-другому быть не могло.
Теперь посмотрите на такую функцию:
Для вычисления значения функции в точке x=4 мы также можем придерживаться синхронного порядка: вначале выполнить операцию возведения в квадрат, потом вычислим произведение и просуммируем полученные результаты:
Если внимательно посмотреть на эту функцию, то можно заметить, что для того, чтобы вычислить x^2 не нужно знать значение произведения 2*x и наоборот. Операции вычисления квадратного корня и произведения можно выполнять независимо друг от друга.
Python с нуля. Урок 1 | Первая программа. Переменные
… значения 4^2 и 2*4 вычисляются независимо разными вычислителями…
Более житейский пример будет выглядеть так: синхронность — это когда вы сначала сварили картошку, а потом помыли кастрюлю, и помыть ее раньше того, как в ней приготовили вы не можете. Асинхронность — это когда вы варите картошку и одновременно прибираетесь на кухне – эти задачи можно выполнять параллельно.
Теперь несколько слов о конкурентности и параллелизме . Конкурентность предполагает выполнение нескольких задач одним исполнителем. Из примера с готовкой: один человек варит картошку и прибирается, при этом, в процессе, он может переключаться: немного прибрался, пошел помешал-посмотрел на картошку, и делает он это до тех пор, пока все не будет готово.
Параллельность предполагает параллельное выполнение задач разными исполнителями: один человек занимается готовкой, другой приборкой. В примере с математикой операции 4^2 и 2*4 могут выполнять два разных процессора.
Несколько слов о GIL
Для того, чтобы двигаться дальше необходимо сказать несколько слов о GIL . GIL — это аббревиатура от Global Interpreter Lock – глобальная блокировка интерпретатора. Он является элементом эталонной реализации языка Python , которая носит название CPython . Суть GIL заключается в том, что выполнять байт код может только один поток.
Это нужно для того, чтобы упростить работу с памятью (на уровне интерпретатора) и сделать комфортной разработку модулей на языке C . Это приводит к некоторым особенностям, о которых необходимо помнить. Условно, все задачи можно разделить на две большие группы: в первую входят те, что преимущественно используют процессор для своего выполнения, например, математические, их ещё называют CPU-bound , во вторую – задачи работающие с вводом выводом (диск, сеть и т.п.), такие задачи называют IO-bound . Если вы запустили в одном интерпретаторе несколько потоков, которые в основном используют процессор, то скорее всего получите общее замедление работы, а не прирост производительности.
Пока выполняется одна задача, остальные простаивают (из-за GIL), переключение происходит через определенные промежутки времени. Таким образом, в каждый конкретный момент времени, будет выполняться только один поток, несмотря на то, что у вас может быть многоядерный процессор (или многопроцессорный сервер), плюс ко всему, будет тратиться время на переключение между задачами. Если код в потоках в основном выполняет операции ввода-вывода, то в этом случае ситуация будет в вашу пользу. В CPython все стандартные библиотечные функций, которые выполняют блокирующий ввод-вывод, освобождают GIL , это дает возможность поработать другим потокам, пока ожидается ответ от ОС.
Потоки в Python
Потоки позволяют запустить выполнение нескольких задач в конкурентном режиме в рамках одного процесса интерпретатора. При этом, нужно помнить о GIL . Все потоки будут выполняться на одном CPU , даже если задачи могут выполняться параллельно. Поэтому есть такое правило, если ваши задачи в основном потребляют ресурсы процессора, то используйте процессы, если ввод-вывод, то потоки и другие инструменты асинхронного программирования, которые в Python обладают довольно мощным функционалом.
Создание и ожидание завершения работы потоков. Класс Thread
За создание, управление и мониторинг потоков отвечает класс Thread из модуля threading . Поток можно создать на базе функции, либо реализовать свой класс – наследник Thread и переопределить в нем метод run() . Для начала рассмотрим вариант создания потока на базе функции:
from threading import Thread from time import sleep def func(): for i in range(5): print(f»from child thread: «) sleep(0.5) th = Thread(target=func) th.start() for i in range(5): print(f»from main thread: «) sleep(1)
В приведенном выше примере мы импортировали нужные модули. После этого объявили функцию func() , которая выводит пять раз сообщение с числовым маркером с задержкой в 500 мс. Далее создали объект класса Thread , в нем, через параметр target, указали, какую функцию запускать как поток и запустили его. В главном потоке добавили код вывода сообщений с интервалом в 1000 мс.
В результате запуска этого кода получим следующее:
from child thread: 0 from main thread: 0 from child thread: 1 from main thread: 1 from child thread: 2 from child thread: 3 from main thread: 2 from child thread: 4 from main thread: 3 from main thread: 4
Как вы можете видеть, код из главного и дочернего потоков выполняются псевдопараллельно (во всяком случае создается такое ощущение), т.к. задержка в дочернем потоке меньше, то сообщение из него появляются чаще.
Если необходимо дождаться завершения работы потока(ов) перед тем как начать выполнять какую-то другую работу, то воспользуйтесь методом join() :
th1 = Thread(target=func) th2 = Thread(target=func) th1.start() th2.start() th1.join() th2.join() print(«—> stop»)
У join() есть параметр timeout , через который задается время ожидания завершения работы потоков.
Для того, чтобы определить выполняет ли поток какую-то работу или завершился используется метод is_alive() .
th = Thread(target=func) print(f»thread status: «) th.start() print(f»thread status: «) sleep(5) print(f»thread status: «)
В результате получим следующее:
thread status: False from child thread: 0 thread status: True from child thread: 1 from child thread: 2 from child thread: 3 from child thread: 4 thread status: False
Для задания потоку имени воспользуйтесь свойством name .
Создание классов наследников от Thread
Ещё одни способ создавать и управлять потоками – это реализовать класс наследник от Thread и переопределить у него метод run() .
class CustomThread(Thread): def __init__(self, limit): Thread.__init__(self) self._limit = limit def run(self): for i in range(self._limit): print(f»from CustomThread: «) sleep(0.5) cth = CustomThread(3) cth.start()
В терминале получим следующее:
from CustomThread: 0 from CustomThread: 1 from CustomThread: 2
Принудительное завершение работы потока
В Python у объектов класса Thread нет методов для принудительного завершения работы потока. Один из вариантов решения этой задачи – это создать специальный флаг, через который потоку будет передаваться сигнал остановки. Доступ к такому флагу должен управляться объектом синхронизации.
from threading import Thread, Lock from time import sleep lock = Lock() stop_thread = False def infinit_worker(): print(«Start infinit_worker()») while True: print(«—> thread work») lock.acquire() if stop_thread is True: break lock.release() sleep(0.1) print(«Stop infinit_worker()») # Create and start thread th = Thread(target=infinit_worker) th.start() sleep(2) # Stop thread lock.acquire() stop_thread = True lock.release()
Если мы запустим эту программу, то в консоли увидим следующее:
Start infinit_worker() —> thread work —> thread work —> thread work —> thread work —> thread work Stop infinit_worker()
Разберемся с этим кодом более подробно. В строке 4 мы создаем объект класса Lock , он используется для синхронизации доступа к ресурсам из нескольких потоков, про них мы более подробно расскажем в следующей статье. В нашем случае, ресурс — это переменная stop_thread , объявленная в строке 6, которая используется как сигнал для остановки потока. После этого, в строке 8, объявляется функция infinit_worker() , ее мы запустим как поток. В ней выполняется бесконечный цикл, каждый проход которого отмечается выводом в терминал сообщения “ –> thread work ” и проверкой состояния переменной stop_thread . В главном потоке программы создается и запускается дочерний поток (строки 24, 25), выполняется функция задержки и принудительно завершается поток путем установки переменной stop_thread значения True .
Потоки-демоны
Есть такая разновидность потоков, которые называются демоны (терминология взята из мира Unix -подобных систем). Python-приложение не будет закрыто до тех пор, пока в нем работает хотя бы один недемонический поток.
def func(): for i in range(5): print(f»from child thread: «) sleep(0.5) th = Thread(target=func) th.start() print(«App stop»)
from child thread: 0 App stop from child thread: 1 from child thread: 2 from child thread: 3 from child thread: 4
Как вы можете видеть, приложение продолжает работать, даже после того, как главный поток завершился (сообщение: “App stop”).
Для того, чтобы потоки не мешали остановке приложения (т.е. чтобы они останавливались вместе с завершением работы программы) необходимо при создании объекта Thread аргументу daemon присвоить значение True , либо после создания потока, перед его запуском присвоить свойству deamon значение True . Изменим процесс создания потока в приведенной выше программе:
th = Thread(target=func, daemon=True)
Запустим ее, получим следующий результат:
from child thread: 0 App stop
Поток остановился вместе с остановкой приложения.
P.S.
Вводные уроки по “Линейной алгебре на Python” вы можете найти соответствующей странице нашего сайта . Все уроки по этой теме собраны в книге “Линейная алгебра на Python”.
Если вам интересна тема анализа данных, то мы рекомендуем ознакомиться с библиотекой Pandas. Для начала вы можете познакомиться с вводными уроками. Все уроки по библиотеке Pandas собраны в книге “Pandas. Работа с данными”.
Python. Урок 22. Потоки и процессы в Python. Часть 1. Управление потоками : 2 комментария
- Константин 19.08.2020 Замечательные уроки, коротко и понятно излагаете важные вещи!
Жду следующие статьи, продолжайте в том же духе!
Источник: devpractice.ru
Модуль multiprocessing на примерах
2 февраля 2018 г.
Archy
Просмотров: 129997
RSS
7
Примеры Python » Общие вопросы
Locker, Logging, multiprocessing, Pipe, Pool, Queue
Модуль multiprocessing был добавлен в Python версии 2.6. Изначально он был определен в PEP 371 Джесси Ноллером и Ричардом Одкерком. Модуль multiprocessing позволяет вам создавать процессы таким же образом, как при создании потоков при помощи модуля threading.
Суть в том, что, в связи с тем, что мы теперь создаем процессы, вы можете обойти GIL (Global Interpreter Lock) и воспользоваться возможностью использования нескольких процессоров на компьютере. Пакет multiprocessing также включает ряд API, которых вообще нет в модуле threading. Например, есть очень удобный класс Pool, который вы можете использовать для параллельного выполнения функции между несколькими входами. Мы рассмотрим Pool немного позже. Мы начнем с класса Process модуля multiprocessing.
Приступим к работе с Multiprocessing
Класс Process очень похож на класс Thread модуля threading. Давайте попробуем создать несколько процессов, которые вызывают одну и ту же функцию, и посмотрим, как это сработает:
import os from multiprocessing import Process def doubler(number): «»» Функция умножитель на два «»» result = number * 2 proc = os.getpid() print(‘ doubled to by process id: ‘.format( number, result, proc)) if __name__ == ‘__main__’: numbers = [5, 10, 15, 20, 25] procs = [] for index, number in enumerate(numbers): proc = Process(target=doubler, args=(number,)) procs.append(proc) proc.start() for proc in procs: proc.join()
Для этого примера мы импортируем Process и создаем функцию doubler.
Внутри функции, мы дублируем число, которое мы ей передали. Мы также используем модуль os, чтобы получить ID нынешнего процесса. Это скажет нам, какой именно процесс вызывает функцию. Далее, в нижнем блоке кода, мы создаем несколько Процессов и начинаем их.
Самый последний цикл только вызывает метод join() для каждого из процессов, что говорит Python подождать, пока процесс завершится. Если вам нужно остановить процесс, вы можете вызвать метод terminate(). Когда вы запустите этот код, вы получите выдачу, на подобие этой:
5 doubled to 10 by process id: 10468 10 doubled to 20 by process id: 10469 15 doubled to 30 by process id: 10470 20 doubled to 40 by process id: 10471 25 doubled to 50 by process id: 10472
Все же иногда приятно иметь читабельное название процессов. К счастью, класс Process дает возможность вам получить доступ к названию вашего процесса. Давайте посмотрим:
import os from multiprocessing import Process, current_process def doubler(number): result = number * 2 proc_name = current_process().name print(‘ doubled to by: ‘.format( number, result, proc_name)) if __name__ == ‘__main__’: numbers = [5, 10, 15, 20, 25] procs = [] proc = Process(target=doubler, args=(5,)) for index, number in enumerate(numbers): proc = Process(target=doubler, args=(number,)) procs.append(proc) proc.start() proc = Process(target=doubler, name=’Test’, args=(2,)) proc.start() procs.append(proc) for proc in procs: proc.join()
На этот раз мы импортируем кое-что дополнительно: current_process.
Это примерно то же самое, что и current_thread модуля threading. Мы используем его для того, чтобы получить имя потока, который вызывает нашу функцию. Обратите внимание на то, что мы не указывали название первых пяти процессов. И только шестой мы назвали Test. Давайте посмотрим, какую выдачу мы получим:
5 doubled to 10 by: Process-2 10 doubled to 20 by: Process-3 15 doubled to 30 by: Process-4 20 doubled to 40 by: Process-5 25 doubled to 50 by: Process-6 2 doubled to 4 by: Test
Выдача показывает, что модуль multiprocessing назначает номер каждому процессу, как часть его названия по умолчанию. Конечно, когда мы лично определяем название, модуль не будет добавлять число к нашему названию
Замки (Locks)
Модуль multiprocessing поддерживает замки так же, как и модуль threading. Все что вам нужно, это импортировать Lock, повесить его, сделать что-нибудь и снять его. Давайте посмотрим:
from multiprocessing import Process, Lock def printer(item, lock): «»» Выводим то что передали «»» lock.acquire() try: print(item) finally: lock.release() if __name__ == ‘__main__’: lock = Lock() items = [‘tango’, ‘foxtrot’, 10] for item in items: p = Process(target=printer, args=(item, lock)) p.start()
Здесь мы создали простую функцию вывода, которая выводит все, что вы ей передаете. Чтобы не дать процессам конфликтовать друг с другом, мы используем объект Lock.
Этот код зациклится над нашим списком трех объектов и создаст процесс для каждого из них. Каждый процесс будет вызывать нашу функцию, и передавать её одному из объектов. Так как мы используем замки, следующий процесс в строке будет ждать, пока замок не снимается, после чего он сможет продолжить.
Логирование (Logging)
Логирование процессов немного отличается от логирования потоков. Причина в том, что пакет logging не использует замки, предназначенные для процессов, так что в итоге вы можете получить результат, который состоит из кучи перемешанных между собой процессов. Давайте попробуем добавить базовый логгинг к предыдущему примеру. Вот код:
import logging import multiprocessing from multiprocessing import Process, Lock def printer(item, lock): «»» Выводим то что передали «»» lock.acquire() try: print(item) finally: lock.release() if __name__ == ‘__main__’: lock = Lock() items = [‘tango’, ‘foxtrot’, 10] multiprocessing.log_to_stderr() logger = multiprocessing.get_logger() logger.setLevel(logging.INFO) for item in items: p = Process(target=printer, args=(item, lock)) p.start()
Простейший способ вести журнал, это отправить все на stderr.
Мы можем сделать это, вызвав функцию log_to_stderr(). Далее мы вызываем функцию get_logger для получения доступа к логгеру и настраиваем его уровень логгинга на INFO. Остальная часть кода остается такой же, какой и была. Обратите внимание на то, что я не вызываю метод join() здесь. Вместо этого, поток parent (другими словами, ваш скрипт) вызовет join() лично. Когда вы сделаете это, вы получите что-то на подобие:
[INFO/Process-1] child process calling self.run() tango [INFO/Process-1] process shutting down [INFO/Process-1] process exiting with exitcode 0 [INFO/Process-2] child process calling self.run() [INFO/MainProcess] process shutting down foxtrot [INFO/Process-2] process shutting down [INFO/Process-3] child process calling self.run() [INFO/Process-2] process exiting with exitcode 0 10 [INFO/MainProcess] calling join() for process Process-3 [INFO/Process-3] process shutting down [INFO/Process-3] process exiting with exitcode 0 [INFO/MainProcess] calling join() for process Process-2
Давайте пойдем дальше, и рассмотрим класс Pool поближе
Класс Pool
Класс Pool используется для показа пула рабочих процессов. Он включает в себя методы, которые позволяют вам разгружать задачи к рабочим процессам. Давайте посмотрим на простейший пример:
from multiprocessing import Pool def doubler(number): return number * 2 if __name__ == ‘__main__’: numbers = [5, 10, 20] pool = Pool(processes=3) print(pool.map(doubler, numbers))
Здесь мы создали экземпляр Pool и указали ему создать три рабочих процесса. Далее мы используем метод map для отображения функции для каждого процесса. Наконец мы выводим результат, что в нашем случае является списком: [10, 20, 40]. Вы также можете получить результат вашего процесса в пуле, используя метод apply_async:
from multiprocessing import Pool def doubler(number): return number * 2 if __name__ == ‘__main__’: pool = Pool(processes=3) result = pool.apply_async(doubler, (25,)) print(result.get(timeout=1))
Так мы можем запросить результат процесса. В этом суть работы функции get. Она пытается получить наши результаты. Обратите внимание на то, что мы также настроили обратный отсчет, на тот случай, если что-нибудь произойдет с вызываемой нами функцией.
Мы не хотим, чтобы она была заблокирована.
Связь между процессами
Когда речь заходит о связи между процессами, модули нашего multiprocessing включают в себя два главных метода: Queue и Pipe. Работа Queue защищена как от процессов, так и от потоков. Давайте взглянем на достаточно простой пример:
from multiprocessing import Process, Queue sentinel = -1 def creator(data, q): «»» Creates data to be consumed and waits for the consumer to finish processing «»» print(‘Creating data and putting it on the queue’) for item in data: q.put(item) def my_consumer(q): «»» Consumes some data and works on it In this case, all it does is double the input «»» while True: data = q.get() print(‘data found to be processed: <>’.format(data)) processed = data * 2 print(processed) if data is sentinel: break if __name__ == ‘__main__’: q = Queue() data = [5, 10, 13, -1] process_one = Process(target=creator, args=(data, q)) process_two = Process(target=my_consumer, args=(q,)) process_one.start() process_two.start() q.close() q.join_thread() process_one.join() process_two.join()
Здесь нам только и нужно, что импортировать Process и Queue.
Далее мы создаем две функции, одна для создания данных и добавления их в очередь, и вторая для использования данных и обработки их. Добавление данных в Queue выполняется при помощи метода put(), в то время как получение данных из Queue выполняется через метод get. Последний кусок кода только создает объект Queue и несколько экземпляров Process, после чего возвращает их. Обратите внимание на то, что мы вызываем join() в наших объектах process больше, чем Queue.
Подведем итоги
Здесь мы прошли через достаточно большое количество материала. Вы узнали много чего нового о модуле multiprocessing для направления обычных функций, связи между процессами при помощи Queue, наименований потоков и многого другого. Разумеется, в документации Python предоставлено намного больше развернутой информации, которую я даже не начинал затрагивать в данной статье, так что настоятельно рекомендую с ней ознакомиться. Тем не менее, вы все-таки узнали много чего о том, как усилить мощность обработки вашего компьютера при помощи Python!
Еще записи по теме
- Итерации и циклы в Python
- Социальные закладки, что это?
- Игровой автомат Warlords: Crystals of Power — за незабываемыми победами в Вулкан казино
- Форматирование Python-кода
- Методы для работы со словарями в Python
- Бесплатное обучение программированию на Python для любителей ИТ
- Работа с файлами в Python
Источник: python-3.ru