Если вы хотя бы раз разрабатывали многопоточное приложение, не важно
под какую ос - думаю вы сталкивались с понятием синхронизации. Т.е. я
не собираюсь досконально разбирать что это такое и для чего нужно -
есть масса руководств по этой теме :). Здесь будет рассматриваться
один конкретный случай синхронизации потоков применительно к ОС Linux.
И так приступим.
Для создания потока используется библиотека pthread и вызов
pthread_create, для синхронизации в этой же библиотеке описаны
специальные объекты - мутексы. Мутекс - это объект который может
принадлежать в некий момент времени только одному потоку и имеющий два
состояния - занят и свободен. Поток пытающийся получить доступ к
мутексу в случае если последний занят будет остановлен системой до
освобождения объекта. На этом собственно и основана синхронизация -
перед использованием общего ресурса потоки сначала обращаются к
мутексу и в конечном счете выстраиваются в очередь.
Все отлично работает - но рассмотрим ситуацию когда есть много потоков
выполняющих только чтение некоего ресурса(они не нуждаются в
синхронизации между собой) и один или несколько потоков выполняющих
изменение этого ресурса. Т.е. синхронизировать нужно этот пишущий
поток со всеми читателями. Если просто использовать мутекс для
синхронизации доступа к ресурсу очевидно что и потоки чтения будут
получать доступ туда последовательно, что в свою очередь замедлит
приложение, фактически превратив его в однопоточное :). В этой
ситуации одним из вариантов решения будет применение семафора. Семафор
- это специальный объект ядра предназначенный для взаимодействия
процессов в системе. Не буду утомлять описанием этого объекта, это
прекрасно сделали до меня, например тут [20]Linux Interprocess
Communications. Упомяну только что в системе создается именованное
множество семафоров(содержащее минимум 1 семафор), каждый семафор
содержит некое количество ресурсов выраженное целым числом. Поток
может запрашивать ресурсы у семафора и естественно должен отдавать их
обратно когда они более не нужны. В случае если запрошенное количество
ресурсов недоступно, но меньше максимального имеющегося количества -
поток ожидает освобождения(в случае если не установлена опция - без
ожидания)- иначе возвращается ошибка. Таким образом семафор более
гибкий механизм синхронизации(однако неприятность в том что это объект
ядра). Попробуем применить его к озвученной выше задаче.
Для облегчения работы рассмотрим объектный интерфейс реализующий
множество семафоров состоящее из одного семафора: В примере 2 файла -
можно безболезненно слить их в один, например sem.cpp :)
/*
* В моей системе данное объединение не объявлено в подключенных файлах -
* если компилятор напишет что оно ранее объявлено - просто сотрите :)
*/
union semun {
int val; /* Value for SETVAL */
struct semid_ds *buf; /* Buffer for IPC_STAT, IPC_SET */
unsigned short *array; /* Array for GETALL, SETALL */
struct seminfo *__buf; /* Buffer for IPC_INFO
(Linux specific) */
};
class sem
{
private:
int sid; // идентификатор семафора
key_t key; // ключ по которому получаем идентификатор
int res_count; // количество ресурсов у семафора
public:
/*
* кол-во ресурсов
* некое случайное число
* путь в системе - обязательно должен существовать !
*/
sem(int max_res, int id, const char* identify);
/*
* деструктор - тут удаляем семафор, иначе он останется до
* следующей перезагрузки системы или пока его кто-то явно не удалит
*/
~sem();
/*
* обертки для занятия/освобождения ресурсов
*/
bool lock(int res);
bool unlock(int res);
};
typedef sem* psem;
;
Cледующий файл:
/*sem.cpp*/
#include "sem.h"
sem::sem(int max_res, int id, const char* identify)
{
sid = -1;
res_count = 0;
/*
* получаем ключ для семафора
*/
if ((key = ftok(identify,id)) == -1)
{
return;
}
/*
* 0666 - rw для всех, чтобы потом можно было обратиться к семафору
* от любого пользователя системы, в общем то личное дело каждого
* сначала пытаемся открыть имеющийся семафор с таким ключом - и
* удалить его, старый нам ни к чему
*/
if ((sid = semget(key, 0, 0666)) != -1)
{
if (semctl(sid, 0, IPC_RMID, 0) == -1)
{
sid = 0;
}
}
if (sid != 0) // проверяем что семафор был найден и удален или не существовал
{
/*
* создаем с флагом IPC_EXCL - означает что в случае если семафор уже
* имеется - вызов будет провален c значением EEXIST,
* без возврата значения отрытого уже существующего семафора
*/
if ((sid = semget( key, 1, IPC_CREAT | IPC_EXCL | 0666 )) != -1) //
{
union semun semopts;
semopts.val = max_res;
semctl(sid, 0, SETVAL, semopts);
res_count = max_res;
}
} else sid = -1;
}
bool sem::lock(int res)
{
/*
* отсекаем неверные запросы сразу, не используя обращения к структурам семафора
*/
if ((res > res_count) || (sid == -1))
{
return false;
}
/*
* параметры в структуре
* 0 - номер семафора
* количество ресурсов - если захватываем, должно быть отрицательным
* 0 - ждать если на данный момент нет достаточного
* количества ресурсов или IPC_NOWAIT - возвращать ошибку
*/
struct sembuf sem_lock={0,(-1)*res,0};
/*
* параметры запроса
* - идентификатор семафора
* - структура которую заполняли выше - ее адрес
* - сколько раз выполнить операцию
*/
if ((semop(sid, &sem_lock, 1)) == -1)
{
return false;
}
return true;
}
Создадим семафор с 10 ресурсами например, читающие потоки будут
запрашивать по 1 ресурсу за раз, пишущий сразу 10. В итоге читающие
потоки будут блокировать друг друга только в случае если их запущенное
число превысит 10(нужно самостоятельно подбирать нужное значение).
Поток записи же при старте будет гарантированно ждать освобождения
ресурса всеми потоками чтения и заставлять ждать позднее стартующие
потоки окончания своей работы. Хочу обратить внимание на один нюанс -
перед запросом получения ресурса потоком чтения следует ставить запрос
на разрешение обратиться за получением этого самого ресурса. Т.е.
поток чтения в случае если не установлен некий флаг разрешения
зацикливается и ждет пока его установят, только после этого приступая
к запросу. Это нужно для того чтобы потоки чтения не оттесняли поток
записи в конец цикла ожидания - например освобождено уже 9 ресурсов из
10, поток записи ждет, но запускается поток чтения, запрашивает ресурс
и естественное его получает - в итоге 8 свободных ресурсов :), запись
оттесняется. Кстати очень удобно если требуется выполнение какой-либо
работы во время простоя системы :).
/*
* Флаг остановки доступа к ресурсам
*/
bool stop_access = false;
// процедура потока чтения
void* read_proc(void* arg)
{
if (arg == NULL)
{
cerr << "Argument empty n";
return NULL;
}
int pid = getpid();
// ждем разрешения
while(stop_access)
{}
// получаем ресурс
if (((psem)(arg))->lock(1))
{
// типа что-то делаем :)
cout << "reader lock " << pid << endl;
sleep(1);
cout << " unlock " << pid << endl;
if (((psem)(arg))->unlock(1))
{
}
}
return NULL;
}
// процедура потока записи
void* write_proc(void* arg)
{
if (arg == NULL)
{
cerr << "Argument empty n";
return NULL;
}
int pid = getpid();
// запрещаем обращение к ресурсам
stop_access = true;
if (((psem)(arg))->lock(10))
{
stop_access = false;
cout << "writer lock " << pid << endl;
sleep(4);
cout << " unlock " << pid << endl;
if (((psem)(arg))->unlock(10))
{
}
}
return;
}
main(int argc,char* argv[])
{
sem* sm = new sem(10,getpid(),".");
// инициализируем потоки
pthread_attr_t attr;
pthread_attr_init(&attr);
// отсоединенный поток - не ждем его возврата
pthread_attr_setdetachstate(&attr,PTHREAD_CREATE_DETACHED);
int ret;
pthread_setcancelstate(PTHREAD_CANCEL_ENABLE , &ret);
pthread_setcanceltype(PTHREAD_CANCEL_ASYNCHRONOUS, &ret);
int i = 0;
pthread_t thread_handle;
// последовательный запуск 8-ми потоков
for (i = 0; i < 8; i++)
{
if (pthread_create(&thread_handle,NULL,&read_proc,sm) != 0)
{
cout << strerror(errno) << endl;
}
// запускаем поток записи на 3-е итерации
if (i == 2)
{
if (pthread_create(&thread_handle,NULL,&write_proc,sm) != 0)
{
cout << strerror(errno) << endl;
}
}
}
// задержка выхода - чтобы увидеть результат
sleep(10);
sm->~sem();
return 0;
}
Заметно что запустившийся поток записи заблокировал ресурс, отработал
и только после этого дал зеленый свет остальным потокам. Потоки чтения
не блокируют друг друга - об этом можно судить по порядку вывода
записей освобождения ресурсов - происходит не в порядке захвата.
Собственно на этом заканчивается мое повествование, для более
подробной информации по семафорам можно пройти по ссылке которая
упоминалась выше. Надеюсь этот материал поможет вам в решении ваших
задач.
1126 Прочтений • [Синхронизация потоков в OC Linux (thread lock gcc)] [08.05.2012] [Комментариев: 0]