Дата последнего изменения: 17.01.2024
В ряде случаев при создании своих проектов имеет смысл не использовать какие-либо фреймворки, сервера и так далее, а написать свой код, например, тот же веб-сервер. Такое решение будет самым оптимальным, так как вы сможете учесть все нюансы именно вашей задачи. Это не сложно, но и не просто. Это требует глубокого понимания как устроена операционная система, какие бывают сетевые протоколы, как правильно взаимодействовать с ядром операционной системы через интерфейс системных вызовов.
Обработкой сетевых сокетов должно заниматься ядро операционной системы и уведомлять вас о наступлении события:
На данный момент доминируют другие методы обработки соединений: создаётся куча потоков или процессов (работают медленнее и потребляют значительно больше памяти). Это работает в случаях когда дешевле купить еще одну "железку" чем учить программиста асинхронной обработке демультиплексированных сокетов. Однако когда нужно решить задачу эффективно на текущем оборудовании, сократив издержки на 1-2 порядка, иного способа как научиться понимать суть сетевых процессов и программировать в соответствии с их законами - нет.
Считается, что асинхронная обработка демультипрексированных сокетов - это гораздо сложнее с точки зрения программирования, чем 50 строк в отдельном процессе. Но это не так. Даже на заточенном немного на другие задачи PHP написать быстрый сервер совсем просто.
Рассмотрим пример написания сервера на PHP.
В PHP есть поддержка BSD-сокетов. Но это расширение не поддерживает ssl/tls. Поэтому нужно использовать интерфейс потоков streams. За этим интерфейсом можно увидеть сетевые сокеты и довольно эффективно с ними работать.
Полный исходный код сетевого сервера приводить не будем, отобразим его ключевые части. Сервер устойчиво держит без перекомпиляции PHP до 1024 открытых сокета в одном процессе, занимая около 18-20 МБ и работая в одном процессе операционной системы, загружая "одно" ядро процессора. Если пересобрать PHP, то select может работать с гораздо большим числом сокетов.
Задачи ядра сервера:
То есть в ядро сервера отправляются задания на работу сокетами (например, ходить по сайтам и собирать данные и тому подобное) и ядро в одном процессе начинает выполнять сотни заданий одновременно.
Задание это объект в терминологии ООП типа FSM. Внутри объекта имеется стратегия. Например: "зайди по этому адресу, создай запрос, загрузи ответ, распарси и т.п. возвраты в начало и в конце запиши результат в NoSQL". То есть можно создать задание от простой загрузки содержимого, до сложной цепочки нагрузочного тестирования с многочисленными ветвлениями - и это все будет жить в объекте задания.
Задания в данной реализации ставятся через отдельный управляющий сокет на 8000 порту - пишутся json-объекты в tcp-сокет и затем начинают свое движение в сервером ядре.
Главное: не позволить серверному процессу заблокироваться в ожидании ответа в функции при чтении или записи информации в сетевой сокет, при ожидании нового соединения на управляющий сокет или где-нибудь в сложных вычислениях/циклах. Поэтому все сокеты заданий проверяются в системном вызове select и ядро ОС уведомляет нас лишь тогда, когда событие случается (либо по таймауту).
while (true) { $ar_read = null; $ar_write = null; $ar_ex = null; //Собираемся читать также управляющий сокет, вместе с сокетами заданий $ar_read[] = $this->controlSocket; foreach ($this->jobs as $job) { //job cleanup if ( $job->isFinished() ) { $key = array_search($job, $this->jobs); if (is_resource($job->getSocket())) { //"надежно" закрываем сокет stream_socket_shutdown($job->getSocket(),STREAM_SHUT_RDWR); fclose($job->getSocket()); } unset($this->jobs[$key]); $this->jobsFinished++; continue; } //Задания могут "засыпать" на определенное время, например при ошибке удаленного сервера if ($job->isSleeping()) continue; //Заданию нужно инициировать соединение if ($job->getStatus()=='DO_REQUEST') { $socket = $this->createJobSocket($job); if ($socket) { $ar_write[] = $socket; } //Задание хочет прочитать ответ из сокета } else if ($job->getStatus()=='READ_ANSWER') { $socket = $job->getSocket(); if ($socket) { $ar_read[] = $socket; } //Заданию нужно записать запрос в сокет } else if ( $job->getStatus()=='WRITE_REQUEST' ) { $socket = $job->getSocket(); if ($socket) { $ar_write[] = $socket; } } } //Ждем когда ядро ОС нас уведомит о событии или делаем дежурную итерацию раз в 30 сек $num = stream_select($ar_read, $ar_write, $ar_ex, 30);
Далее, когда событие произошло и ОС уведомила нас, начинаем в неблокирующем режиме обработку сокетов. В коде ниже можно еще немного оптимизировать обход массива заданий, индексировать задания по номеру сокета и выиграть примерно 10мс.
if (is_array($ar_write)) { foreach ($ar_write as $write_ready_socket) { foreach ($this->getJobs() as $job) { if ($write_ready_socket == $job->getSocket()) { $dataToWrite = $job->readyDataWriteEvent(); $count = fwrite($write_ready_socket , $dataToWrite, 1024); //Сообщаем объекту сколько байт удалось записать в сокет $job->dataWrittenEvent($count); } } } } if (is_array($ar_read)) { foreach ($ar_read as $read_ready_socket) { ///// command processing /// //Пришло соединение на управляющий сокет, обрабатываем команду if ($read_ready_socket == $this->controlSocket) { $csocket = stream_socket_accept($this->controlSocket); //Тут упрощение - верим локальному клиенту, что он закроет соединение. Иначе ставьте таймаут. if ($csocket) { $req = ''; while ( ($data = fread($csocket,10000)) !== '' ) { $req .= $data; } //Обрабатываем команду также в неблокирующем режиме $this->processCommand(trim($req), $csocket); stream_socket_shutdown($csocket, STREAM_SHUT_RDWR); fclose($csocket); } continue; /// ///// } else { //Читаем из готового к чтению сокета без блокировки $data = fread($read_ready_socket , 10000); foreach ($this->getJobs() as $job) { if ($read_ready_socket == $job->getSocket()) { //Передаем заданию считанные данные. Если сокет закроется, считаем пустую строку. $job->readyDataReadEvent($data); } } } } } }
Сам сокет инициируется также в неблокирующем режиме, важно установить флаги, причем оба! STREAM_CLIENT_ASYNC_CONNECT|STREAM_CLIENT_CONNECT:
private function createJobSocket(BxRequestJob $job) { //Check job protocol if ($job->getSsl()) { //https $ctx = stream_context_create( array('ssl' => array( 'verify_peer' => false, 'allow_self_signed' => true ) ) ); $errno = 0; $errorString = ''; //Вот тут происходит временами блокировочка в 30-60мс, видимо из-за установки TCP-соединения с удаленным хостом, надо глянуть в исходники, но снова ... лень $socket = stream_socket_client('ssl://'.$job->getConnectServer().':443',$errno,$errorString,30,STREAM_CLIENT_ASYNC_CONNECT|STREAM_CLIENT_CONNECT,$ctx); if ($socket === false) { $this->log(__METHOD__." connect error: ". $job->getConnectServer()." ". $job->getSsl() ."$errno $errorString"); $job->connectedSocketEvent(false); $this->connectsFailed++; return false; } else { $job->connectedSocketEvent($socket); $this->connectsCreated++; return $socket; } } else { //http ...
Код самого задания: оно должно уметь работать с частичными ответами/запросами. Для начала сообщим ядру сервера что мы хотим записать в сокет.
//Формируем тело следующего запроса function readyDataWriteEvent() { if (!$this->dataToWrite) { if ($this->getParams()) { $str = http_build_query($this->getParams()); $headers = $this->getRequestMethod()." ".$this->getUri()." HTTP/1.0\r\nHost: ".$this->getConnectServer()."\r\n". "Content-type: application/x-www-form-urlencoded\r\n". "Content-Length:".strlen($str)."\r\n\r\n"; $this->dataToWrite = $headers; $this->dataToWrite .= $str; } else { $headers = $this->getRequestMethod()." ".$this->getUri()." HTTP/1.0\r\nHost: ".$this->getConnectServer()."\r\n\r\n"; $this->dataToWrite = $headers; } return $this->dataToWrite; } else { return $this->dataToWrite; } }
Теперь пишем запрос, определяя сколько еще осталось.
//Пишем запрос в сокет до того момента, когда полностью запишем его тело function dataWrittenEvent($count) { if ($count === false ) { //socket was reset $this->jobFinished = true; } else { $dataTotalSize = strlen($this->dataToWrite); if ($count<$dataTotalSize) { $this->dataToWrite = substr($this->dataToWrite,$count); $this->setStatus('WRITE_REQUEST'); } else { //Когда успешно записали запрос в сокет, переходим в режим чтения ответа $this->setStatus('READ_ANSWER'); } } }
После получения запроса, читаем ответ. Важно понять, когда ответ прочитан полностью. Возможно понадобиться установить таймаут на чтение.
//Читаем из сокета до момента, когда полностью прочитаем ответ и совершаем переход в другой статус function readyDataReadEvent($data) { ////////// Successfull data read ///// if ($data) { $this->body .= $data; $this->setStatus('READ_ANSWER'); $this->bytesRead += strlen($data); ///// ////////// } else { ////////// А тут мы уже считали ответ и начинаем его парсить ///// ////////// redirect if ( preg_match("|\r\nlocation:(.*)\r\n|i",$this->body, $ar_matches) ) { $url = parse_url(trim($ar_matches[1])); $this->setStatus('DO_REQUEST'); } else if (...) { //Так мы сигнализируем ядру сервера, что задание нужно завершить $this->jobFinished = true; ... } else if (...) { $this->setSleepTo(time()+$this->sleepInterval); $this->sleepInterval *=2; $this->retryCount--; $this->setStatus('DO_REQUEST'); } $this->body = ''; ...
В последнем фрагменте кода можно иерархически направлять FSM по заложенной стратегии, реализуя различные варианты работы задания.
Решить задачу одновременной работы с сотнями и тысячами заданий и сокетов всего в одном процессе PHP можно просто и лаконично. Если поднять для данного сервера сколько процессов PHP, сколько ядер на сервере, то можно вести речь о тысячах обслуживаемых клиентов. Серверный процесс PHP при этом потребляет всего около 20МБ памяти.