95  /  96

Производительный сетевой сервер на PHP

Просмотров: 1878 (Статистика ведётся с 06.02.2017)

В ряде случаев при создании своих проектов имеет смысл не использовать какие-либо фреймворки, сервера и так далее, а написать свой код, например, тот же веб-сервер. Такое решение будет самым оптимальным, так как вы сможете учесть все нюансы именно вашей задачи. Это не сложно, но и не просто. Это требует глубокого понимания как устроена операционная система, какие бывают сетевые протоколы, как правильно взаимодействовать с ядром операционной системы через интерфейс системных вызовов.

Обработкой сетевых сокетов должно заниматься ядро операционной системы и уведомлять вас о наступлении события:

  1. Пришел новый сокет в слушающем соединении (listen) - и его можно взять в обработку (accept)
  2. Можно прочитать из сокета, не блокируя процесс (read).
  3. Можно записать в сокет, не блокируя процесс (write).


На данный момент доминируют другие методы обработки соединений: создаётся куча потоков или процессов (работают медленнее и потребляют значительно больше памяти). Это работает в случаях когда дешевле купить еще одну "железку" чем учить программиста асинхронной обработке демультиплексированных сокетов. Однако когда нужно решить задачу эффективно на текущем оборудовании, сократив издержки на 1-2 порядка, иного способа как научиться понимать суть сетевых процессов и программировать в соответствии с их законами - нет.

Считается, что асинхронная обработка демультипрексированных сокетов - это гораздо сложнее с точки зрения программирования, чем 50 строк в отдельном процессе. Но это не так. Даже на заточенном немного на другие задачи PHP написать быстрый сервер совсем просто.

Рассмотрим пример написания сервера на PHP.

В PHP есть поддержка BSD-сокетов. Но это расширение не поддерживает ssl/tls. Поэтому нужно использовать интерфейс потоков streams. За этим интерфейсом можно увидеть сетевые сокеты и довольно эффективно с ними работать.

Полный исходный код сетевого сервера приводить не будем, отобразим его ключевые части. Сервер устойчиво держит без перекомпиляции PHP до 1024 открытых сокета в одном процессе, занимая около 18-20 МБ и работая в одном процессе операционной системы, загружая "одно" ядро процессора. Если пересобрать PHP, то select может работать с гораздо большим числом сокетов.

Ядро сервера

Задачи ядра сервера:

  1. Проверить массив заданий
  2. Для каждого задания создать соединение
  3. Проверить, что сокет в задании можно прочитать или записать без блокировки процесса
  4. Освободить ресурсы (сокет и т.п) задания
  5. Принять соединение от управляющего сокета на добавление задания — без блокировки процесса

То есть в ядро сервера отправляются задания на работу сокетами (например, ходить по сайтам и собирать данные и тому подобное) и ядро в одном процессе начинает выполнять сотни заданий одновременно.

Задание

Задание это объект в терминологии ООП типа FSM. Внутри объекта имеется стратегия. Например: "зайди по этому адресу, создай запрос, загрузи ответ, распарси и т.п. возвраты в начало и в конце запиши результат в NoSQL". То есть можно создать задание от простой загрузки содержимого, до сложной цепочки нагрузочного тестирования с многочисленными ветвлениями - и это все будет жить в объекте задания.

Задания в данной реализации ставятся через отдельный управляющий сокет на 8000 порту - пишутся json-объекты в tcp-сокет и затем начинают свое движение в сервером ядре.

Принцип обработки заданий и сокетов

Главное: не позволить серверному процессу заблокироваться в ожидании ответа в функции при чтении или записи информации в сетевой сокет, при ожидании нового соединения на управляющий сокет или где-нибудь в сложных вычислениях/циклах. Поэтому все сокеты заданий проверяются в системном вызове select и ядро ОС уведомляет нас лишь тогда, когда событие случается (либо по таймауту).

        while (true) {

            $ar_read = null;
            $ar_write = null;
            $ar_ex = null;

            //Собираемся читать также управляющий сокет, вместе с сокетами заданий
            $ar_read[] = $this->controlSocket;

            foreach ($this->jobs as $job) {

                //job cleanup
                if ( $job->isFinished() ) {

                    $key = array_search($job, $this->jobs);

                    if (is_resource($job->getSocket())) {
                        //"надежно" закрываем сокет
                        stream_socket_shutdown($job->getSocket(),STREAM_SHUT_RDWR);
                        fclose($job->getSocket());
                    }

                    unset($this->jobs[$key]);
                    $this->jobsFinished++;

                    continue;
                }

                //Задания могут "засыпать" на определенное время, например при ошибке удаленного сервера  
                if ($job->isSleeping()) continue;

                //Заданию нужно инициировать соединение
                if ($job->getStatus()=='DO_REQUEST') {

                    $socket = $this->createJobSocket($job);
                    if ($socket) {
                        $ar_write[] = $socket;
                    }
                
                //Задание хочет прочитать ответ из сокета
                } else if ($job->getStatus()=='READ_ANSWER') {

                    $socket = $job->getSocket();
                    if ($socket) {
                        $ar_read[] = $socket;
                    }
                
                //Заданию нужно записать запрос в сокет
                } else if ( $job->getStatus()=='WRITE_REQUEST' ) {

                    $socket = $job->getSocket();
                    if ($socket) {
                        $ar_write[] = $socket;
                    }

                }
            }

            //Ждем когда ядро ОС нас уведомит о событии или делаем дежурную итерацию раз в 30 сек
            $num = stream_select($ar_read, $ar_write, $ar_ex, 30);

Далее, когда событие произошло и ОС уведомила нас, начинаем в неблокирующем режиме обработку сокетов. В коде ниже можно еще немного оптимизировать обход массива заданий, индексировать задания по номеру сокета и выиграть примерно 10мс.

                if (is_array($ar_write)) {
                    foreach ($ar_write as $write_ready_socket) {
                        foreach ($this->getJobs() as $job) {
                            if  ($write_ready_socket == $job->getSocket()) {
                                $dataToWrite = $job->readyDataWriteEvent();
                                $count = fwrite($write_ready_socket , $dataToWrite, 1024);
                                
                                //Сообщаем объекту сколько байт удалось записать в сокет
                                $job->dataWrittenEvent($count);
                            }
                        }
                    }
                }

                if (is_array($ar_read)) {
                    foreach ($ar_read as $read_ready_socket) {

                        ///// command processing
                        ///

                        //Пришло соединение на управляющий сокет, обрабатываем команду
                        if ($read_ready_socket == $this->controlSocket) {
                            $csocket = stream_socket_accept($this->controlSocket);
                            
                             //Тут упрощение - верим локальному клиенту, что он закроет соединение. Иначе ставьте таймаут.
                             if ($csocket) {
                                $req = '';
                                while ( ($data = fread($csocket,10000)) !== '' ) {
                                    $req .= $data;
                                }

                                //Обрабатываем команду также в неблокирующем режиме                                
                                $this->processCommand(trim($req), $csocket);
                                stream_socket_shutdown($csocket, STREAM_SHUT_RDWR);
                                fclose($csocket);
                            }

                            continue;

                            ///
                            /////

                        } else {

                            //Читаем из готового к чтению сокета без блокировки 
                            $data = fread($read_ready_socket , 10000);

                            foreach ($this->getJobs() as $job) {

                                if  ($read_ready_socket == $job->getSocket()) {
                                    
                                    //Передаем заданию считанные данные. Если сокет закроется, считаем пустую строку.
                                    $job->readyDataReadEvent($data);

                                }
                            }
                        }
                    }
                }
            }

Сам сокет инициируется также в неблокирующем режиме, важно установить флаги, причем оба! STREAM_CLIENT_ASYNC_CONNECT|STREAM_CLIENT_CONNECT:

    private function createJobSocket(BxRequestJob $job) {

        //Check job protocol

        if ($job->getSsl()) {
        //https

            $ctx = stream_context_create(
                        array('ssl' =>
                            array(
                                'verify_peer' => false,
                                'allow_self_signed' => true
                            )
                        )
            );

            $errno = 0;
            $errorString = '';

            //Вот тут происходит временами блокировочка в 30-60мс, видимо из-за установки TCP-соединения с удаленным хостом, надо глянуть в исходники, но снова ... лень
            $socket = stream_socket_client('ssl://'.$job->getConnectServer().':443',$errno,$errorString,30,STREAM_CLIENT_ASYNC_CONNECT|STREAM_CLIENT_CONNECT,$ctx);

            if ($socket === false) {
                $this->log(__METHOD__." connect error: ". $job->getConnectServer()." ". $job->getSsl() ."$errno $errorString");
                $job->connectedSocketEvent(false);
                $this->connectsFailed++;
                return false;
            } else {

                $job->connectedSocketEvent($socket);
                $this->connectsCreated++;
                return $socket;
            }

        } else {
        //http
...

Код самого задания: оно должно уметь работать с частичными ответами/запросами. Для начала сообщим ядру сервера что мы хотим записать в сокет.

//Формируем тело следующего запроса

    function readyDataWriteEvent() {

        if (!$this->dataToWrite) {

            if ($this->getParams()) {
                $str = http_build_query($this->getParams());

                $headers = $this->getRequestMethod()." ".$this->getUri()." HTTP/1.0\r\nHost: ".$this->getConnectServer()."\r\n".
                    "Content-type: application/x-www-form-urlencoded\r\n".
                    "Content-Length:".strlen($str)."\r\n\r\n";
                $this->dataToWrite = $headers;
                $this->dataToWrite .= $str;

            } else {
                $headers = $this->getRequestMethod()." ".$this->getUri()." HTTP/1.0\r\nHost: ".$this->getConnectServer()."\r\n\r\n";
                $this->dataToWrite = $headers;
            }

            return $this->dataToWrite;

        } else {
            return $this->dataToWrite;
        }

    }

Теперь пишем запрос, определяя сколько еще осталось.

//Пишем запрос в сокет до того момента, когда полностью запишем его тело
    function dataWrittenEvent($count) {

        if ($count === false ) {

            //socket was reset
            $this->jobFinished = true;

        } else {

            $dataTotalSize = strlen($this->dataToWrite);
            if ($count<$dataTotalSize) {
                $this->dataToWrite = substr($this->dataToWrite,$count);
                $this->setStatus('WRITE_REQUEST');
            } else {
                //Когда успешно записали запрос в сокет, переходим в режим чтения ответа
                $this->setStatus('READ_ANSWER');
            }

        }

    }

После получения запроса, читаем ответ. Важно понять, когда ответ прочитан полностью. Возможно понадобиться установить таймаут на чтение.

//Читаем из сокета до момента, когда полностью прочитаем ответ и совершаем переход в другой статус
    function readyDataReadEvent($data)
    {

        ////////// Successfull data read
        /////

        if ($data)  {
            $this->body .= $data;

            $this->setStatus('READ_ANSWER');

            $this->bytesRead += strlen($data);
        /////
        //////////

        } else {

        ////////// А тут мы уже считали ответ и начинаем его парсить
        /////

                ////////// redirect
                if ( preg_match("|\r\nlocation:(.*)\r\n|i",$this->body, $ar_matches) ) {

                    $url = parse_url(trim($ar_matches[1]));
                    $this->setStatus('DO_REQUEST');

                } else if (...) {
                    
                    //Так мы сигнализируем ядру сервера, что задание нужно завершить
                    $this->jobFinished = true;
                    ...
                } else if (...) {

                    $this->setSleepTo(time()+$this->sleepInterval);
                    $this->sleepInterval *=2;
                    $this->retryCount--;

                    $this->setStatus('DO_REQUEST');
                }

            $this->body = '';
            ...

В последнем фрагменте кода можно иерархически направлять FSM по заложенной стратегии, реализуя различные варианты работы задания.

Итоги

Решить задачу одновременной работы с сотнями и тысячами заданий и сокетов всего в одном процессе PHP можно просто и лаконично. Если поднять для данного сервера сколько процессов PHP, сколько ядер на сервере, то можно вести речь о тысячах обслуживаемых клиентов. Серверный процесс PHP при этом потребляет всего около 20МБ памяти.


12
Курсы разработаны в компании «1С-Битрикс»

Если вы нашли неточность в тексте, непонятное объяснение, пожалуйста, сообщите нам об этом в комментариях.
Развернуть комментарии