{ "cells": [ { "cell_type": "markdown", "id": "79d36a4a", "metadata": {}, "source": [ "# Синхронизация потоков #" ] }, { "cell_type": "markdown", "id": "9d01200a", "metadata": {}, "source": [ "На этой лекции мы поговорим про синхронизацию потоков и обсудим очереди, блокировки и условные переменные. Если вы запустите несколько потоков для решения своей задачи, то вам рано или поздно придётся обмениваться данными между потоками. Для этого как раз понадобятся все эти вещи, которые мы обсудим.\n", "\n", "Давайте для начала разберёмся, как можно использовать модуль `queue` и очереди для обмена данными между потоками." ] }, { "cell_type": "code", "execution_count": null, "id": "21d47d71", "metadata": {}, "outputs": [], "source": [ "# Очереди, модуль queue\n", "from queue import Queue\n", "from threading import Thread\n", "\n", "def worker(q, n):\n", " while True:\n", " item = q.get()\n", " if item is None:\n", " break\n", " \n", " print(f\"process data: {n} {item}\")\n", " \n", "q = Queue(5)\n", "th1 = Thread(target=worker, args=(q, 1))\n", "th2 = Thread(target=worker, args=(q, 2))\n", "th1.start(); th2.start()\n", "\n", "for i in range(50):\n", " q.put(i)\n", " \n", "q.put(None); q.put(None)\n", "th1.join(); th2.join()" ] }, { "cell_type": "markdown", "id": "4278c862", "metadata": {}, "source": [ "Использование очередей выглядит достаточно простым. На примере мы создаём объект типа очередь с максимальным размером 5. Для помещения элементов в очередь необходимо использовать метод `put` для объекта `queue`. Хочу обратить ваше внимание, что, если в очереди будет уже пять элементов, то вызов метода `put` заблокирует выполнение потока, который вызвал этот метод, и будет ждать, пока не появится в очереди свободное место.\n", "\n", "Итак, для обработки сообщений этой очереди мы создаём пару потоков — это объекты класса `Thread`, мы обсуждали это уже на предыдущей лекции, как создавать потоки. Передаём в этот объект функцию `worker`, и этой функции мы передаём нашу очередь. \n", "\n", "Итак, наша функция `worker` будет выполняться в двух потоках независимых. Они будут выполняться параллельно. Каждый поток в бесконечном цикле будет получать сообщение из очереди при помощи вызова метода `get` у объекта `q`.\n", "\n", "Давайте выполним этот код в консоли и посмотрим, как он выполняется. Я подготовил заранее код. Запускаем его при помощи интерпретатора Python 3." ] }, { "cell_type": "code", "execution_count": 1, "id": "74d6f330", "metadata": {}, "outputs": [ { "name": "stdout", "output_type": "stream", "text": [ "process data: 1 0\n", "process data: 2 1\n", "process data: 1 2\n", "process data: 1 3\n", "process data: 2 4\n", "process data: 1 5\n", "process data: 2 6\n", "process data: 2 7\n", "process data: 1 8\n", "process data: 2 9\n", "process data: 2 10\n", "process data: 1 11\n", "process data: 2 12\n", "process data: 2 13\n", "process data: 1 14\n", "process data: 2 15\n", "process data: 2 16\n", "process data: 1 17\n", "process data: 2 18\n", "process data: 2 19\n", "process data: 1 20\n", "process data: 2 21\n", "process data: 2 22\n", "process data: 1 23\n", "process data: 2 24\n", "process data: 2 25\n", "process data: 1 26\n", "process data: 2 27\n", "process data: 2 28\n", "process data: 1 29\n", "process data: 2 30\n", "process data: 2 31\n", "process data: 1 32\n", "process data: 2 33\n", "process data: 2 34\n", "process data: 1 35\n", "process data: 2 36\n", "process data: 2 37\n", "process data: 1 38\n", "process data: 2 39\n", "process data: 2 40\n", "process data: 1 41\n", "process data: 2 42\n", "process data: 2 43\n", "process data: 1 44\n", "process data: 2 45\n", "process data: 2 46\n", "process data: 1 47\n", "process data: 2 48\n", "process data: 2 49\n" ] } ], "source": [ "! python ex_queue.py" ] }, { "cell_type": "markdown", "id": "8508171d", "metadata": {}, "source": [ "Хочу обратить ваше внимание, что, действительно, наши потоки выполнили этот код параллельно, и мы видим то цифру 2, то цифру 1 в нашем стандартном выводе. Большое внимание нужно уделить правильному завершению потока. С точки зрения процесса, ресурсами владеет процесс, то есть выделенная память или открытый файл — ими владеет процесс. Но процесс ничего не знает о том, что делает с этими ресурсами поток. И если поток завершить аварийно, то файл может остаться незакрытым, блокировка может остаться невысвобожденной, и теоретически это может привести к непредвиденным последствиям. Поэтому в Python не существует функции аварийного завершения потока. Очень важно делать это правильно в функции самого потока. На приведённом примере в очередь помещается специальное значение `None`, и функция потока при проверке условия завершает свою работу. Код с использованием очередей выглядит достаточно просто, и предпочтительнее использовать очереди при разработке многопоточных программ.\n", "\n", "Тем не менее, иногда приходится использовать блокировки. Блокировки как минимум замедляют работу программы. Тем не менее, иногда их нужно применять. Давайте рассмотрим пример, который я привёл на слайде." ] }, { "cell_type": "code", "execution_count": null, "id": "85c77ff3", "metadata": {}, "outputs": [], "source": [ "# Синхронизация потоков, race condition\n", "import threading\n", "\n", "class Point(object):\n", " def __init__(self, x, y):\n", " self.set(x, y)\n", " \n", " def get(self):\n", " return (self.x, self.y)\n", " \n", " def set(self, x, y):\n", " self.x = x\n", " self.y = y\n", "\n" ] }, { "cell_type": "markdown", "id": "492e4ec8", "metadata": {}, "source": [ "Предположим, у нас есть класс Точка, и у класса Точка есть координаты `x` и `y`. Также у этого класса есть метод `get`, который возвращает эти координаты, и метод `set`, который задаёт новые координаты." ] }, { "cell_type": "code", "execution_count": null, "id": "fea1d1d9", "metadata": {}, "outputs": [], "source": [ "# use in threads\n", "my_point = Point(10, 20)\n", "my_point.set(15, 10)\n", "my_point.get()" ] }, { "cell_type": "markdown", "id": "46e2e5b2", "metadata": {}, "source": [ "Предположим, что мы создали объект класса Точка и используем этот объект в большом количестве потоков. Эти потоки, некоторые вызывают метод `get`, некоторые вызывают метод `set`. Если бы не было блокировок, то может возникнуть такая ситуация, когда один поток изменил значение переменной `x` или координаты `x`, а другой поток в это время вернул координаты `x` и `y`. Мы получили неконсистентное состояние объекта, когда у него частично одна координата изменена, а вторая нет. Для того чтобы избежать подобных ситуаций, и нужны блокировки. Для того чтобы создать объект блокировки необходимо вызвать вот такую конструкцию." ] }, { "cell_type": "code", "execution_count": null, "id": "ba350d3c", "metadata": {}, "outputs": [], "source": [ "# Синхронизация потоков, блокировки\n", "import threading\n", "\n", "class Point(object):\n", " def __init__(self, x, y):\n", " self.mutex = threading.RLock()\n", " self.set(x, y)\n", " \n", " def get(self):\n", " with self.mutex:\n", " return (self.x, self.y)\n", " \n", " def set(self, x, y):\n", " with self.mutex:\n", " self.x = x\n", " self.y = y" ] }, { "cell_type": "markdown", "id": "937fbcc5", "metadata": {}, "source": [ "Создаём объект блокировки, и при помощи контекстного менеджера мы захватываем блокировку, а при выходе из контекстного менеджера блокировка высвобождается. Таким образом, легко и удобно создавать блокировки на Python. Подобные ситуации иногда называют гонкой за ресурсами, или `race condition`.\n", "\n", "Давайте рассмотрим ещё один вариант применения блокировок. Их можно использовать без контекстного менеджера. Выглядит это тоже довольно-таки просто." ] }, { "cell_type": "code", "execution_count": null, "id": "3d55ca1f", "metadata": {}, "outputs": [], "source": [ "# Синхронизация потоков, блокировки\n", "import threading\n", "\n", "a = threading.RLock()\n", "b = threading.RLock()\n", "\n", "def foo():\n", " try:\n", " a.acquire()\n", " b.acquire()\n", " finally:\n", " a.release()\n", " b.release()" ] }, { "cell_type": "markdown", "id": "72d1888f", "metadata": {}, "source": [ "Мы создаём объекты класса `RLock` и затем вызываем методы `acquire` — это получить или захватить блокировку и метод `release` для того, чтобы высвободить её.\n", "\n", "Если мы запустим подобный код в большом количестве процессов, то рано или поздно это приведёт к ситуации, которая называется `deadlock`. Дело в том, что мы освобождаем в неправильной последовательности блокировки. Нужно учитывать это в своих программах и отдавать предпочтение использованию контекстного менеджера при работе с блокировками.\n", "\n", "Также в Python существует ещё и объект класса `Lock`, а не `RLock`, но предпочтительнее использовать объекты `RLock`. Они позволяют в одном потоке получить блокировку дважды.\n", "\n", "В Python существует ещё один механизм для синхронизации потоков. Он называется \"условные переменные\". Давайте рассмотрим класс Очередь." ] }, { "cell_type": "code", "execution_count": 2, "id": "cc49d357", "metadata": {}, "outputs": [], "source": [ "# Синхронизация потоков, условные переменные\n", "class Queue(object):\n", " def __init__(self, size=5):\n", " self._size = size\n", " self._queue = []\n", " self._mutex = threading.RLock()\n", " self._empty = threading.Condition(self._mutex)\n", " self._full = threading.Condition(self._mutex)\n", " \n", " def put(self, val):\n", " with self._full:\n", " while len(self._queue) >= self._size:\n", " self._full.wait()\n", " \n", " self._queue.append(val)\n", " self._empty.notify()\n", " \n", " def get(self):\n", " with self._empty:\n", " while len(self._queue) == 0:\n", " self._empty.wait()\n", "\n", " ret = self._queue.pop(0)\n", " self._full.notify()\n", " return ret" ] }, { "cell_type": "markdown", "id": "df62337b", "metadata": {}, "source": [ "Это очередь, с которой нужно будет работать в большом количестве потоков. У неё есть операции `put` и `get`, и, конечно же, у неё есть какой-то размер. Очередь не должна расти больше заданного размера. Если мы выполним операцию `put`, а в очереди уже достаточно большое количество элементов, то нам необходимо ждать пока это количество уменьшится. Вопрос — сколько ждать? Неизвестно. Ответа на этот вопрос мы не получим.\n", "\n", "Для решения подобной задачи можно использовать условные переменные. Условные переменные в конструктор получает объект блокировки. Он есть по умолчанию, но если у нас эти переменные взаимозависимые, то необходимо использовать общую блокировку. И при помощи этих условных переменных очень легко и удобно ожидать событий при помощи вызова `wait` и оповещать все потоки, которые сейчас ждут наступления этого события. Таким образом, очень легко и удобно можно реализовать очередь в Python, которая работает в многопоточной программе.\n", "\n", "Итак, мы обсудили вопрос синхронизации потоков. Потоки выполняются в рамках одного процесса, в котором они были созданы. Потоки разделяют память и все ресурсы процесса. Потоки более легковесны по сравнению с процессами. Для обмена данными между потоками можно использовать очереди. Мы рассмотрели пример программы. Использование очередей предпочтительнее по сравнению с использованием блокировок. Также все потоки в Python исполняются с глобальной блокировкой интерпретатора, и об этом мы поговорим в следующей лекции." ] } ], "metadata": { "kernelspec": { "display_name": "Python 3", "language": "python", "name": "python3" }, "language_info": { "codemirror_mode": { "name": "ipython", "version": 3 }, "file_extension": ".py", "mimetype": "text/x-python", "name": "python", "nbconvert_exporter": "python", "pygments_lexer": "ipython3", "version": "3.6.8" } }, "nbformat": 4, "nbformat_minor": 5 }