You cannot select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

238 lines
18 KiB
Plaintext

This file contains ambiguous Unicode characters!

This file contains ambiguous Unicode characters that may be confused with others in your current locale. If your use case is intentional and legitimate, you can safely ignore this warning. Use the Escape button to highlight these characters.

{
"cells": [
{
"cell_type": "markdown",
"id": "8b2dae71",
"metadata": {},
"source": [
"# Обработка нескольких соединений #"
]
},
{
"cell_type": "markdown",
"id": "112a8b70",
"metadata": {},
"source": [
"Итак, на прошлой лекции мы рассматривали простые программы типа Клиент-Сервер и пробовали организовать взаимодействие между двумя процессами. А что делать, если этих процессов будет несколько или не несколько, а очень много. Как раз на этой лекции мы рассмотрим примеры обработки большого количества соединений на стороне сервера. Давайте рассмотрим наш изначальный пример, который мы запускали на серверной стороне."
]
},
{
"cell_type": "code",
"execution_count": null,
"id": "a94b02c6",
"metadata": {},
"outputs": [],
"source": [
"# обработка нескольких соединений одновременно\n",
"import socket\n",
"\n",
"with socket.socket() as sock:\n",
" sock.bind((\"\", 10001))\n",
" sock.listen()\n",
" while True:\n",
" conn, addr = sock.accept()\n",
" print(f\"connected client: {addr}\")\n",
" \n",
" # процесс или поток для обработки соединения\n",
" with conn:\n",
" while True:\n",
" data = conn.recv(1024)\n",
" \n",
" if not data:\n",
" break\n",
" \n",
" print(data.decode(\"utf8\"))"
]
},
{
"cell_type": "markdown",
"id": "ba69797e",
"metadata": {},
"source": [
"Итак, мы создаем объект `socket`, вызываем `bind` и `listen`, и затем принимаем новое соединение. Если мы приняли соединение и начинаем его обработку в том же самом потоке управления, мы не можем принимать новые соединения. Если у нас будет большое количество клиентов, то все остальные клиенты будут вынуждены ждать, пока мы закончим работу с первым соединением. Какие подходы существуют для решения данной задачи?\n",
"\n",
"Конечно, мы можем создать процесс или поток для обработки отдельного соединения и выполнить в этом процессе или потоке код по его обработке. Давайте представим, что мы будем создавать процессы для обработки нового соединения, то есть у нас есть сервер, предположим, у нас есть 10,000 клиентов. Чтобы более конкретно говорить о каких-то примерах, можем представить себе мобильные приложения, которые сейчас так популярны. Например, они будут нам отправлять какие-то статистические данные. Мобильных приложений очень много, все они запускаются, и, предположим, 10,000 соединений одновременно приходит на наш сервер и нам надо обработать все эти запросы. Если мы создадим 10,000 процессов, это будет иметь ряд своих минусов. Как минимум, это потребует очень больших ресурсов от нашей операционной системы. На каждый процесс нужна память, каждый процесс нужно, чтобы операционная система управляла всем этим большим количеством процессов. Иногда, если даже и сам запрос требует небольшого количества ресурсов, например, нам что-то нужно просто записать в лог-файл, создание процесса на обработку этого соединения будет гораздо дороже, чем обработка. Тем не менее, такой подход иногда используется, и, если у вас небольшое количество соединений, плюсом будет то, что вы можете использовать все ядра операционной системы и распределять обработку по всем ядрам на сервере. Если же мы рассмотрим поток в качестве обработки нового соединения, то, как мы помним, все потоки работают в Python на одном ядре, и они ограничены GIL. Рано или поздно мы упремся в то, что нам не хватает одного ядра, и наш сервер будет отвечать не за приемлемое время, не за то, которое мы хотели бы от него ожидать. Тем не менее, и на потоках, особенно если они требуют операции ввода-вывода, можно получить достаточно производительный сервер. Давайте рассмотрим пример одновременной обработки сетевых запросов при помощи потоков."
]
},
{
"cell_type": "code",
"execution_count": null,
"id": "4c754304",
"metadata": {},
"outputs": [],
"source": [
"# обработка нескольких соединений одновременно, потоки\n",
"import socket\n",
"import threading\n",
"\n",
"def process_request(conn, addr):\n",
" print(f\"connected client: {addr}\")\n",
" with conn:\n",
" while True:\n",
" data = conn.recv(1024)\n",
" \n",
" if not data:\n",
" break\n",
" \n",
" print(data.decode(\"utf8\"))\n",
"\n",
"\n",
"with socket.socket() as sock:\n",
" sock.bind((\"\", 10001))\n",
" sock.listen()\n",
" while True:\n",
" conn, addr = sock.accept()\n",
" \n",
" th = threading.Thread(target=process_request, args=(conn, addr))\n",
" th.start()"
]
},
{
"cell_type": "markdown",
"id": "2fd60033",
"metadata": {},
"source": [
"Итак, мы создаем `socket`, вызываем нами известные методы `bind` и `listen`. Затем в бесконечном цикле принимаем входящее соединение от клиента. Как только мы приняли это входящее соединение, мы должны создать поток. Делаем это мы при помощи модуля `threading`, создаем объект класса `thread`, передаём ему в качестве аргумента функцию и наше соединение, с которым будем дальше в этой функции работать. Запускаем поток, и в основном потоке мы продолжаем акцептить новые соединения, и, тем самым, мы обрабатываем уже существующее соединение и, обрабатывая, ждём новых. В данном случае у нас может быть создано большое количество потоков, в которых мы точно также обрабатываем это соединение. Как я уже говорил, если процесс обработки этих соединений будет заниматься вводом-выводом, то такой код будет достаточно производительным. Тем не менее, если будет недостаточно одного ядра операционной системы, можно этот процесс распараллелить. Давайте рассмотрим пример, когда можно использовать и потоки, и процессы одновременно."
]
},
{
"cell_type": "code",
"execution_count": null,
"id": "d4eaeb07",
"metadata": {},
"outputs": [],
"source": [
"# обработка нескольких соединений одновременно, процессы и потоки\n",
"import socket\n",
"\n",
"with socket.socket() as sock:\n",
" sock.bind((\"\", 10001))\n",
" sock.listen()\n",
" # создание нескольких процессов\n",
" \n",
" while True:\n",
" # accept распределится \"равномерно\" между процессами\n",
" conn, addr = sock.accept()\n",
" # поток для обработки соединения\n",
" with conn:\n",
" while True:\n",
" data = conn.recv(1024)\n",
" \n",
" if not data:\n",
" break\n",
" \n",
" print(data.decode(\"utf8\"))"
]
},
{
"cell_type": "markdown",
"id": "ef549693",
"metadata": {},
"source": [
"Итак, для того чтобы обрабатывать одно соединение в нескольких процессах, нам нужно выполнить небольшой трюк. Итак, создаем объект класса `socket` в контекстном менеджере, вызываем `bind` и метод `listen`. После того как мы вызвали метод `listen`, мы должны создать несколько процессов, сделать `fork`. `Fork` мы рассматривали на предыдущих лекциях. После того как мы сделаем вызов `fork`, все ресурсы родительского процесса будут целиком и полностью скопированы в дочерние процессы, тем самым в наших дочерних процессах будет тот же самый `socket`. Если мы в этом `socket`'е сделаем вызов `accept` и будем ждать нового соединения от клиента, то системный вызов `accept` распределит равномерно между всеми дочерними процессами новые входящие соединения, а уже дальше в этих дочерних процессах, когда мы поймали новые соединения, мы уже сможем создать поток и обработать новые соединения.\n",
"\n",
"Здесь есть небольшой нюанс. Если, опять же, мы создадим несколько процессов, которые все одновременно делают системный вызов `accept`, то по умолчанию все они будут спать, а операционная система не будет потреблять никаких ресурсов. Но если будет приходить новое входящее соединение, операционная система будет будить все наши процессы. В этом месте есть небольшой `overhead`, то есть если мы будем постоянно принимать новые соединения, наш код будет далать небольшой `overhead`, нужно это понимать. Вот так может выглядеть код нашего сервера на процессах."
]
},
{
"cell_type": "code",
"execution_count": null,
"id": "a684fda3",
"metadata": {},
"outputs": [],
"source": [
"# обработка нескольких соединений одновременно, процессы и потоки\n",
"import socket\n",
"import threading\n",
"import multiprocessing\n",
"\n",
"with socket.socket() as sock:\n",
" sock.bind((\"\", 10001))\n",
" sock.listen()\n",
" \n",
" workers_count = 3\n",
" workers_list = [\n",
" multiprocessing.Process(target=worker, args=(sock,))\n",
" for _ in range(workers_count)\n",
" ]\n",
" \n",
" for w in workers_list:\n",
" w.start()\n",
" \n",
" for w in workers_list:\n",
" w.join()"
]
},
{
"cell_type": "markdown",
"id": "8c2a535a",
"metadata": {},
"source": [
"Итак, как я уже говорил, мы создаем `socket`, вызываем методы `bind` и `listen`. Затем мы должны при помощи модуля `multiprocess` создать наши `worker`'ы, которые будут обрабатывать наши новые соединения. Итак, мы создаем наши `worker`'ы, запускаем их, и ждем, пока они завершатся. Давайте рассмотрим код наших `worker`'ов."
]
},
{
"cell_type": "code",
"execution_count": null,
"id": "321c0764",
"metadata": {},
"outputs": [],
"source": [
"# обработка нескольких соединений одновременно, процессы и потоки\n",
"\n",
"def process_request(conn, addr):\n",
" print(f\"connected client: {addr}\")\n",
" with conn:\n",
" while True:\n",
" data = conn.recv(1024)\n",
" \n",
" if not data:\n",
" break\n",
" \n",
" print(data.decode(\"utf8\"))\n",
" \n",
"def worker(sock):\n",
" while True:\n",
" conn, addr = sock.accept()\n",
" print(f\"pid {os.getpid()}\")\n",
" \n",
" th = threading.Thread(target=process_request, args=(conn, addr))\n",
" th.start()"
]
},
{
"cell_type": "markdown",
"id": "dc58479b",
"metadata": {},
"source": [
"Итак, каждый `worker`, который будет запущен в отдельном процессе, делает системный вызов `accept`. Все входящие соединения будут равномерно распределены между `worker`'ами при помощи операционной системы. И после того как соединение попало в наш процесс, необходимо создать поток. Создаем поток, передаем ему метод, в данном случае, `process_request`, и обрабатываем наше соединение.\n",
"\n",
"Таким образом, мы сможем решить проблему с `GIL`, то есть у нас будет уже несколько Python процессов запущенных, и мы сможем решить проблему с памятью, то есть все потоки, которые будут созданы в рамках одного процесса, они будут разделять его память, и переключение между потоками будет более легковесно в данном случае. Таким образом, мы сможем обработать достаточно большое количество входящих соединений.\n",
"\n",
"Итак, на этой лекции мы рассмотрели, как организовать взаимодействие между большим количеством клиентов и одним сервером. Мы рассмотрели несколько подходов, обсудили плюсы и минусы того или иного подхода, а также применили наши знания по процессам и потокам, и рассмотрели, как можно, используя процессы и потоки, получить эффективную обработку сетевых запросов."
]
}
],
"metadata": {
"kernelspec": {
"display_name": "Python 3",
"language": "python",
"name": "python3"
},
"language_info": {
"codemirror_mode": {
"name": "ipython",
"version": 3
},
"file_extension": ".py",
"mimetype": "text/x-python",
"name": "python",
"nbconvert_exporter": "python",
"pygments_lexer": "ipython3",
"version": "3.6.8"
}
},
"nbformat": 4,
"nbformat_minor": 5
}