Дата последнего изменения: 17.01.2024
В ряде случаев при создании своих проектов имеет смысл не использовать какие-либо фреймворки, сервера и так далее, а написать свой код, например, тот же веб-сервер. Такое решение будет самым оптимальным, так как вы сможете учесть все нюансы именно вашей задачи. Это не сложно, но и не просто. Это требует глубокого понимания как устроена операционная система, какие бывают сетевые протоколы, как правильно взаимодействовать с ядром операционной системы через интерфейс системных вызовов.
Обработкой сетевых сокетов должно заниматься ядро операционной системы и уведомлять вас о наступлении события:

На данный момент доминируют другие методы обработки соединений: создаётся куча потоков или процессов (работают медленнее и потребляют значительно больше памяти). Это работает в случаях когда дешевле купить еще одну "железку" чем учить программиста асинхронной обработке демультиплексированных сокетов. Однако когда нужно решить задачу эффективно на текущем оборудовании, сократив издержки на 1-2 порядка, иного способа как научиться понимать суть сетевых процессов и программировать в соответствии с их законами - нет.
Считается, что асинхронная обработка демультипрексированных сокетов - это гораздо сложнее с точки зрения программирования, чем 50 строк в отдельном процессе. Но это не так. Даже на заточенном немного на другие задачи PHP написать быстрый сервер совсем просто.
Рассмотрим пример написания сервера на PHP.
В PHP есть поддержка BSD-сокетов. Но это расширение не поддерживает ssl/tls. Поэтому нужно использовать интерфейс потоков streams. За этим интерфейсом можно увидеть сетевые сокеты и довольно эффективно с ними работать.
Полный исходный код сетевого сервера приводить не будем, отобразим его ключевые части. Сервер устойчиво держит без перекомпиляции PHP до 1024 открытых сокета в одном процессе, занимая около 18-20 МБ и работая в одном процессе операционной системы, загружая "одно" ядро процессора. Если пересобрать PHP, то select может работать с гораздо большим числом сокетов.
Задачи ядра сервера:
То есть в ядро сервера отправляются задания на работу сокетами (например, ходить по сайтам и собирать данные и тому подобное) и ядро в одном процессе начинает выполнять сотни заданий одновременно.
Задание это объект в терминологии ООП типа FSM. Внутри объекта имеется стратегия. Например: "зайди по этому адресу, создай запрос, загрузи ответ, распарси и т.п. возвраты в начало и в конце запиши результат в NoSQL". То есть можно создать задание от простой загрузки содержимого, до сложной цепочки нагрузочного тестирования с многочисленными ветвлениями - и это все будет жить в объекте задания.
Задания в данной реализации ставятся через отдельный управляющий сокет на 8000 порту - пишутся json-объекты в tcp-сокет и затем начинают свое движение в сервером ядре.
Главное: не позволить серверному процессу заблокироваться в ожидании ответа в функции при чтении или записи информации в сетевой сокет, при ожидании нового соединения на управляющий сокет или где-нибудь в сложных вычислениях/циклах. Поэтому все сокеты заданий проверяются в системном вызове select и ядро ОС уведомляет нас лишь тогда, когда событие случается (либо по таймауту).
while (true) {
$ar_read = null;
$ar_write = null;
$ar_ex = null;
//Собираемся читать также управляющий сокет, вместе с сокетами заданий
$ar_read[] = $this->controlSocket;
foreach ($this->jobs as $job) {
//job cleanup
if ( $job->isFinished() ) {
$key = array_search($job, $this->jobs);
if (is_resource($job->getSocket())) {
//"надежно" закрываем сокет
stream_socket_shutdown($job->getSocket(),STREAM_SHUT_RDWR);
fclose($job->getSocket());
}
unset($this->jobs[$key]);
$this->jobsFinished++;
continue;
}
//Задания могут "засыпать" на определенное время, например при ошибке удаленного сервера
if ($job->isSleeping()) continue;
//Заданию нужно инициировать соединение
if ($job->getStatus()=='DO_REQUEST') {
$socket = $this->createJobSocket($job);
if ($socket) {
$ar_write[] = $socket;
}
//Задание хочет прочитать ответ из сокета
} else if ($job->getStatus()=='READ_ANSWER') {
$socket = $job->getSocket();
if ($socket) {
$ar_read[] = $socket;
}
//Заданию нужно записать запрос в сокет
} else if ( $job->getStatus()=='WRITE_REQUEST' ) {
$socket = $job->getSocket();
if ($socket) {
$ar_write[] = $socket;
}
}
}
//Ждем когда ядро ОС нас уведомит о событии или делаем дежурную итерацию раз в 30 сек
$num = stream_select($ar_read, $ar_write, $ar_ex, 30);
Далее, когда событие произошло и ОС уведомила нас, начинаем в неблокирующем режиме обработку сокетов. В коде ниже можно еще немного оптимизировать обход массива заданий, индексировать задания по номеру сокета и выиграть примерно 10мс.
if (is_array($ar_write)) {
foreach ($ar_write as $write_ready_socket) {
foreach ($this->getJobs() as $job) {
if ($write_ready_socket == $job->getSocket()) {
$dataToWrite = $job->readyDataWriteEvent();
$count = fwrite($write_ready_socket , $dataToWrite, 1024);
//Сообщаем объекту сколько байт удалось записать в сокет
$job->dataWrittenEvent($count);
}
}
}
}
if (is_array($ar_read)) {
foreach ($ar_read as $read_ready_socket) {
///// command processing
///
//Пришло соединение на управляющий сокет, обрабатываем команду
if ($read_ready_socket == $this->controlSocket) {
$csocket = stream_socket_accept($this->controlSocket);
//Тут упрощение - верим локальному клиенту, что он закроет соединение. Иначе ставьте таймаут.
if ($csocket) {
$req = '';
while ( ($data = fread($csocket,10000)) !== '' ) {
$req .= $data;
}
//Обрабатываем команду также в неблокирующем режиме
$this->processCommand(trim($req), $csocket);
stream_socket_shutdown($csocket, STREAM_SHUT_RDWR);
fclose($csocket);
}
continue;
///
/////
} else {
//Читаем из готового к чтению сокета без блокировки
$data = fread($read_ready_socket , 10000);
foreach ($this->getJobs() as $job) {
if ($read_ready_socket == $job->getSocket()) {
//Передаем заданию считанные данные. Если сокет закроется, считаем пустую строку.
$job->readyDataReadEvent($data);
}
}
}
}
}
}
Сам сокет инициируется также в неблокирующем режиме, важно установить флаги, причем оба! STREAM_CLIENT_ASYNC_CONNECT|STREAM_CLIENT_CONNECT:
private function createJobSocket(BxRequestJob $job) {
//Check job protocol
if ($job->getSsl()) {
//https
$ctx = stream_context_create(
array('ssl' =>
array(
'verify_peer' => false,
'allow_self_signed' => true
)
)
);
$errno = 0;
$errorString = '';
//Вот тут происходит временами блокировочка в 30-60мс, видимо из-за установки TCP-соединения с удаленным хостом, надо глянуть в исходники, но снова ... лень
$socket = stream_socket_client('ssl://'.$job->getConnectServer().':443',$errno,$errorString,30,STREAM_CLIENT_ASYNC_CONNECT|STREAM_CLIENT_CONNECT,$ctx);
if ($socket === false) {
$this->log(__METHOD__." connect error: ". $job->getConnectServer()." ". $job->getSsl() ."$errno $errorString");
$job->connectedSocketEvent(false);
$this->connectsFailed++;
return false;
} else {
$job->connectedSocketEvent($socket);
$this->connectsCreated++;
return $socket;
}
} else {
//http
...
Код самого задания: оно должно уметь работать с частичными ответами/запросами. Для начала сообщим ядру сервера что мы хотим записать в сокет.
//Формируем тело следующего запроса
function readyDataWriteEvent() {
if (!$this->dataToWrite) {
if ($this->getParams()) {
$str = http_build_query($this->getParams());
$headers = $this->getRequestMethod()." ".$this->getUri()." HTTP/1.0\r\nHost: ".$this->getConnectServer()."\r\n".
"Content-type: application/x-www-form-urlencoded\r\n".
"Content-Length:".strlen($str)."\r\n\r\n";
$this->dataToWrite = $headers;
$this->dataToWrite .= $str;
} else {
$headers = $this->getRequestMethod()." ".$this->getUri()." HTTP/1.0\r\nHost: ".$this->getConnectServer()."\r\n\r\n";
$this->dataToWrite = $headers;
}
return $this->dataToWrite;
} else {
return $this->dataToWrite;
}
}
Теперь пишем запрос, определяя сколько еще осталось.
//Пишем запрос в сокет до того момента, когда полностью запишем его тело
function dataWrittenEvent($count) {
if ($count === false ) {
//socket was reset
$this->jobFinished = true;
} else {
$dataTotalSize = strlen($this->dataToWrite);
if ($count<$dataTotalSize) {
$this->dataToWrite = substr($this->dataToWrite,$count);
$this->setStatus('WRITE_REQUEST');
} else {
//Когда успешно записали запрос в сокет, переходим в режим чтения ответа
$this->setStatus('READ_ANSWER');
}
}
}
После получения запроса, читаем ответ. Важно понять, когда ответ прочитан полностью. Возможно понадобиться установить таймаут на чтение.
//Читаем из сокета до момента, когда полностью прочитаем ответ и совершаем переход в другой статус
function readyDataReadEvent($data)
{
////////// Successfull data read
/////
if ($data) {
$this->body .= $data;
$this->setStatus('READ_ANSWER');
$this->bytesRead += strlen($data);
/////
//////////
} else {
////////// А тут мы уже считали ответ и начинаем его парсить
/////
////////// redirect
if ( preg_match("|\r\nlocation:(.*)\r\n|i",$this->body, $ar_matches) ) {
$url = parse_url(trim($ar_matches[1]));
$this->setStatus('DO_REQUEST');
} else if (...) {
//Так мы сигнализируем ядру сервера, что задание нужно завершить
$this->jobFinished = true;
...
} else if (...) {
$this->setSleepTo(time()+$this->sleepInterval);
$this->sleepInterval *=2;
$this->retryCount--;
$this->setStatus('DO_REQUEST');
}
$this->body = '';
...
В последнем фрагменте кода можно иерархически направлять FSM по заложенной стратегии, реализуя различные варианты работы задания.
Решить задачу одновременной работы с сотнями и тысячами заданий и сокетов всего в одном процессе PHP можно просто и лаконично. Если поднять для данного сервера сколько процессов PHP, сколько ядер на сервере, то можно вести речь о тысячах обслуживаемых клиентов. Серверный процесс PHP при этом потребляет всего около 20МБ памяти.