프로그래밍 관련/언어들의 코딩들 C++ JAVA C# 등..

C/C++ 스레드 concurrent_vector, concurrent_queue, parallel_for, parallel_for_each 등등 관련

AlrepondTech 2017. 7. 11. 00:19
반응형

 

 

=================================

=================================

=================================

 

 

출처: http://egloos.zum.com/sweeper/v/3053916

 

 
 
1. concurrent_queue
 
1) 주요 특징
  • push(enqueue)와 try_pop(deque)는 쓰레드 세이프하다.

  • iterator를 제공하지만, 겁나게 느리고 쓰레드 세이프하지 않다.
  • size 함수 대신 unsafe_size 함수를 제공하며, 역시 쓰레드 세이프하지 않다.

  • back 메써드를 제공하지 않는다.
 
뭐, concurrent_queue라고 해도 메써드의 종류와 이름만 다르지 그 용도는 queue의 그것과 크게 달라질 게 없다.
 
아래 표는 concurrent_queue의 주요 함수들의 쓰레드 세이프 여부에 대해 정리한 것이다.
 
 
메서드  thread_safe 메서드   thread_safe  메서드   thread_safe
empty O push O get_allocator O
try_pop O clear X unsafe_end X
unsafe_begin X unsafe_size X operator++ X
operator* X operator-> X assign X
 

 
2) 샘플 코드
 
  1. #include <iostream>
  2. #include <tbb/parallel_for.h>
  3. #include <tbb/concurrent_queue.h>
  4.  
  5. int _tmain(int argc, _TCHAR* argv[])
  6. {
  7.     tbb::concurrent_queue<int> queue;
  8.  
  9.     // 멀티쓰레드 환경에서의 삽입 테스트를 위해 parallel_for 사용
  10.     tbb::parallel_for(040[&] (int i)
  11.     {
  12.         queue.push(i);
  13.     });
  14.  
  15.     int num;
  16.     while (queue.try_pop(num))
  17.         std::cout << num << " ";
  18.  
  19.     return 0;
  20. }
 
 
2. concurrent_priority_queue
 
1) 주요 특징
  • concurrent_queue와는 통상의 queue / priority_queue가 가지는 차이와 동일한 차이를 가진다.
  • concurrent_priority_queue에 들어갈 원소는 당연하게도 compare 기능을 제공해야 한다.
 
 
2) 샘플 코드
 
  1. #include <iostream>
  2. #include <tbb/parallel_for.h>
  3. #include <tbb/concurrent_priority_queue.h>
  4.  
  5. int _tmain(int argc, _TCHAR* argv[])
  6. {
  7.     tbb::concurrent_priority_queue<int, std::greater<int> > queue;
  8.  
  9.     // 멀티쓰레드 환경에서의 삽입 테스트를 위해 parallel_for 사용
  10.     tbb::parallel_for(020[&] (int i)
  11.     {
  12.         queue.push(i);
  13.     });
  14.  
  15.     int num;
  16.     while (queue.try_pop(num))
  17.         std::cout << num << " ";
  18.  
  19.     return 0;
  20. }
  21.  

 

 

=================================

=================================

=================================

 

 

 

출처: http://vsts2010.tistory.com/?page=175

 

 

oncurrent_queue는 queue 자료구조와 같이 앞과 뒤에서 접근할 수 있습니다

concurrent_queue는 스레드 세이프하게 enqueue와 dequeue(queue에 데이터를 넣고 빼는) 조작을 할 수 있습니다.

또 concurrent_queue는 반복자를 지원하지만 이것은 스레드 세이프 하지 않습니다.

 

 

 

concurrent_queue와 queue의 차이점

 

concurrent_queue와 queue는 서로 아주 비슷하지만 다음과 같은 다른 점이 있습니다.

( 정확하게는 concurrent_queue와 STL의 deque와의 차이점 이라고 할수 있습니다. )

 

- concurrent_queue enqueue dequeue 조작이 스레드 세이프 하다.

 

- concurrent_queue는 반복자를 지원하지만 이것은 스레드 세이프 하지 않다.

 

- concurrent_queue front pop 함수를 지원하지 않는다

  대신에 try_pop 함수를 대신해서 사용한다.

 

- concurrent_queue back 함수를 지원하지 않는다

  그러므로 마지막 요소를 참조하는 것은 불가능하다.

 

- concurrent_queue size 메소드 대신 unsafe_size 함수를 지원한다

  unsafe_size는 이름 그대로 스레드 세이프 하지 않다.

 

 

 

스레드 세이프한 concurrent_queue의 함수

 

concurrent_queue에 enqueue 또는 dequeue 하는 모든 조작에 대해서는 스레드 세이프합니다.

 

- empty

- push

- get_allocator

- try_pop

 

empty는 스레드 세이프하지만 empty 호출 후 반환되기 전에 다른 스레드에 의해서 queue가 작아지던가 커지는 경우 이 동작들이 끝난 후에 empty의 결과가 반환됩니다.

 

 

 

스레드 세이프 하지 않은 concurrent_queue의 함수

 

- clear

- unsafe_end

- unsafe_begin

- unsafe_size

 

 

 

반복자 지원

 

앞서 이야기 했듯이 concurrent_queue는 반복자를 지원하지만 이것은 스레드 세이프 하지 않습니다그래서 이것은 디버깅 할 때만 사용할 것을 추천합니다.

또 concurrent_queue의 반복자는 오직 앞으로만 순회할 수 있습니다.

 

concurrent_queue는 아래의 반복자를 지원합니다.

 

- operator++

- operator*

- operator->

 

 

concurrent_queue는 앞서 설명한 concurrent_vector와 같이 스레드 세이프한 컨테이너지만 STL vector deque에는 없는 제약 사항도 있습니다우리들이 Vector deque를 스레드 세이프하게 래핑하는 것보다는 Concurrency Runtime에서 제공하는 컨테이너가 성능적으로 더 좋지만 모든 동작이 스레드 세이프하지 않고 지원하지 않는 것도 있으니 조심해서 사용해야 합니다.

 

 

다음에는 일반적인 queue에는 없고 concurrent_queue에서만 새로 생긴 함수에 대해서 좀 더 자세하게 설명하겠습니다.


ps : 앞 주에 Intel의 TBB에 대한 책을 보았습니다. 전체적으로 Concurrency Runtime과 비슷한 부분이 많아서 책을 생각 외로 빨리 볼 수 있었습니다. 제 생각에 TBB나 
Concurrency Runtime를 공부하면 다른 하나도 아주 빠르고 쉽게 습득할 수 있을 것 같습니다.

출처: http://vsts2010.tistory.com/?page=175 [Visual Studio 2010 공식 팀 블로그 @vsts2010]

 

 

=================================

=================================

=================================

 

 

출처: http://vsts2010.tistory.com/165

 

concurrent_vector의 주요 멤버

 

자주 사용하는 것들과 STL vector에 없는 것들을 중심으로 추려 보았습니다.

멤버 스레드 세이프  
at O  
begin O  
back O  
capacity O  
empty O  
end O  
front O  
grow_by O new
grow_to_at_least O new
max_size O  
operator[] O  
push_back O  
rbegin O  
rend O  
size O  
assign X  
clear X  
reserve X  
resize X  
shink_to_fit X new

 

concurrent_vector는 기존 요소의 값을 변경할 때는 스레드 세이프하지 않습니다. 기존 요소의 값을 변경할 때는 동기화 객체를 사용하여 lock을 걸어야 합니다.

 

 

concurrent_vector 사용 방법

 

concurrent_vector를 사용하기 위해서 먼저 헤더 파일을 포함해야 합니다.

concurrent_vector의 헤더 파일은 “concurrent_vector.h” 입니다.

 

concurrent_vector의 사용 방법은 STL vector를 사용하는 방법과 거의 같습니다그러니 STL vector에 없는 것들만 제외하고는 vector를 사용하는 방법을 아는 분들은 따로 공부해야 할 것이 거의 없습니다.

STL vector에 대해서 잘 모르시는 분들은 About STL : C++ STL 프로그래밍(4)-벡터 글을 참고해 주세요.

  

 

 

concurrent_vector 초 간단 사용 예

 

concurrent_vector를 사용한 아주 아주 간단한 예제입니다.^^

 

 

#include <ppl.h>

#include <concurrent_vector.h>

#include <iostream>

 

using namespace Concurrency;

using namespace std;

 

 

int main()

{

           concurrent_vector< int > v1;

           v1.push_back( 11 );

           return 0;

}

 

 

 

 

STL vector에는 없는 grow_by, grow_to_at_least 사용 법

 

grow_by vector의 크기를 확장해 줍니다.

예를 들어 현재 vector의 크기가(size()에 의한) 10인데 이것을 20으로 키우고 싶을 때 사용합니다.

 

원형은 아래와 같습니다.

iterator grow_by( size_type _Delta );

iterator grow_by( size_type _Delta, const_reference _Item );

 

grow_to_at_least는 현재 vector의 크기가 10인데 이것이 20보다 작을 때만 20으로 증가시키고 싶을 때 사용합니다.

원형은 아래와 같습니다.

iterator grow_to_at_least( size_type _N );

 

grow_by와 grow_to_at_least의 반환 값은 추가된 처음 요소의 위치가 반복자입니다.

 

grow_by의 예제 코드입니다.

 

void Append ( concurrent_vector<char>& vector, const char* string) {

    size_t n = strlen(string) + 1;

    memcpy( &vector[vector_grow_by(n)], string, n+1 );

}

 

위 예제는 http://japan.internet.com/developer/20070306/27.html 에서 참고했습니다.

 

 

 

shink_to_fit

 

shink_to_fit는 메모리 사용량과 단편화를 최적화 시켜줍니다. 이것은 메모리 재할당을 하기 때문에 요소에 접근하는 모든 반복자가 무효화됩니다.

 

 

Intel TBB

 

CPU로 유명한 Intel에서는 멀티코어 CPU를 만들면서 병렬 프로그래밍을 좀 더 쉽고안전화고확장성 높은 프로그램을 만들 수 있도록 툴과 라이브러리를 만들었습니다.

라이브러리 중 TBB라는 병렬 프로그래밍 용 라이브러리가 있습니다아마 TBB를 아시는 분이라면 Concurrent Runtime PPL에 있는 것들이 TBB에 있는 것들과 비슷한 부분이 많다라는 것을 아실 것입니다.

VSTS 2010 Beta2가 나온지 얼마 되지 않아서 병렬 컨테이너에 대한 문서가 거의 없습니다그러나 TBB에 관한 문서는 검색을 해보면 적지 않게 찾을 수 있습니다. concurrent_vector에 대해서 좀 더 알고 싶은 분들은 Intel TBB에 대해서 알아보시면 좋을 것 같습니다.

참고로 TBB 관련 서적이 한국어로 근래에 출간되었습니다.  http://kangcom.com/sub/view.asp?sku=200911100001 )

 

 

다음에는 concurrent_queue에 대해서 알아 보겠습니다.



출처: http://vsts2010.tistory.com/165 [Visual Studio 2010 공식 팀 블로그 @vsts2010]

 

 

 

=================================

=================================

=================================

 

 

출처: http://vsts2010.tistory.com/193

 

 

concurrent_queue는 사용 용도가 concurrent_vector 보다 더 많을 것 같아서 좀 더 자세하게 설명하겠습니다.

 

온라인 서버 애플리케이션의 경우 Producer-Consumer 모델이나 이와 비슷한 모델로 네트웍을 통해서 받은 패킷을 처리합니다즉 스레드 A는 네트웍을 통해서 패킷을 받으면 Queue에 넣습니다그리고 스레드 B Queue에서 패킷을 꺼내와서 처리합니다이 때 Queue는 스레드 A B가 같이 사용하므로 공유 객체입니다공유 객체이므로 패킷을 넣고 뺄 때 크리티컬섹션과 같은 동기 객체로 동기화를 해야 합니다이런 곳에 concurrent_queue를 사용하면 아주 좋습니다.

 

 

concurrent_queue를 사용하기 위한 준비 단계

 

너무 당연하듯이 헤더 파일과 네임스페이스를 선언해야 합니다.

 

헤더파일

 

#include <concurrent_queue.h>

 

 

네임스페이스

 

using namespace Concurrency;

 

을 선언합니다.

 

이제 사전 준비는 끝났습니다. concurrent_queue를 선언한 후 사용하면 됩니다.

 

concurrent_queue< int > queue1;

 

 

 

 

concurrent_queue에 데이터 추가

 

concurrent_queue에 새로운 데이터를 넣을 때는 push 라는 멤버를 사용합니다.

 

원형

 

void push( const _Ty& _Src );

 

 

STL deque push_back과 같은 사용 방법과 기능도 같습니다다만 스레스 세이프 하다는 것이 다릅니다. concurrent_queue는 앞 회에서 이야기 했듯이 스레드 세이프한 컨테이너이므로 제약이 있습니다그래서 deque 와 다르게 제일 뒤로만 새로운 데이터를 넣을 수 있습니다.

  

 

concurrent_queue< int > queue1;

queue1.push( 11 );

 

 

 

 

concurrent_queue에서 데이터 가져오기

 

데이터를 가져올 때는 try_pop 멤버를 사용합니다앞의 push의 경우는 STL deque와 비슷했지만 try_pop은 꽤 다릅니다.

 

원형

 

bool try_pop( _Ty& _Dest );

 

 

try_pop을 호출 했을 때 concurrent_queue에 데이터가 있다면 true를 반환하고 _Dest에 데이터가 담기며 concurrent_queue에 있는 해당 데이터는 삭제됩니다그러나 concurrent_queue에 데이터가 없다면 false를 즉시 반환하고 _Dest에는 호출했을 때의 그대로 됩니다.

 

 

concurrent_queue< int > queue1;

 

queue1.push( 12 );

queue1.push( 14 );

 

int Value = 0;

 

if( queue1.try_pop( Value ) )

{

           // queue1에서 데이터를 가져왔음

}

else

{

           // queue1은 비어 있었음.

}

 

 

 

 

concurrent_queue가 비어 있는지 검사

 

concurrent_queue가 비어 있는지 알고 싶을 때는 empty()를 사용합니다이것은 STL deque와 같습니다.

 

원형

 

bool empty() const;

 

 

비어 있을 때는 true를 반환하고 비어 있지 않을 때는 false를 반환합니다다만 empty를 호출할 때 비어 있는지 검사하므로 100% 정확하지 않습니다. 100% 정확하지 않다라는 것은 empty push, try_pop 이 셋은 스레드 세이프하여 동시에 사용될 수 있으므로 empty를 호출할 시점에는 데이터가 있어서 false를 반환했지만 바로 직후에 다른 스레드에서 try_pop으로 삭제를 해버렸다면 empty 호출 후 false를 반환했어 try_pop을 호출했는데 false가 반환 될 수 있습니다.

 

 

 

concurrent_queue에 있는 데이터의 개수를 알고 싶을 때

 

concurrent_queue에 있는 데이터의 개수를 알고 싶을 때는 unsafe_size 멤버를 사용합니다.

 

원형

 

size_type unsafe_size() const;

 

 

이것은 이름에서도 알 수 있듯이 스레드 세이프 하지 않습니다그래서 unsafe_size를 호출할 때 push try_pop이 호출되면 unsafe_size를 통해서 얻은 결과는 올바르지 않습니다.

 

 

 

concurrent_queue에 있는 데이터 순차 접근

  

concurrent_queue에 있는 데이터를 모두 순차적으로 접근하고 싶을 때는 unsafe_begin unsafe_end를 사용합니다.

 

원형

 

iterator unsafe_begin();

const_iterator unsafe_begin() const;

 

iterator unsafe_end();

const_iterator unsafe_end() const;

 

 

unsafe_begin을 사용하여 선두 위치를 얻고, unsafe_end를 사용하여 마지막 다음 위치(미 사용 영역)를 얻을 수 있습니다이것도 이름에 나와 있듯이 스레드 세이프 하지 않습니다.

 

 

 

모든 데이터 삭제


모든 데이터를 삭제할 때는 clear를 사용합니다이것은 이름에 unsafe라는 것이 없지만 스레드 세이프 하지 않습니다.

 

원형

 

template< typename _Ty, class _Ax >

void concurrent_queue<_Ty,_Ax>::clear();

 

 

 

 

제 글을 보는 분들은 C++을 알고 있다는 가정하고 있기 때문에 STL을 알고 있다고 생각하여 아주 간단하게 concurrent_queue를 설명 하였습니다.

 

concurrent_queue 정말 간단하지 않습니까전체적으로 STL deque와 비슷해서 어렵지 않을 것입니다다만 스레드 세이프 하지 않은 것들이 있기 때문에 이것들을 사용할 때는 조심해야 된다는 것만 유의하면 됩니다.

 

이것으로 Concurrency Runtime PPL에 대한 설명은 일단락 되었습니다.

이후에는 Concurrency Runtime의 다른 부분을 설명할지 아니면 Beta2에서 새로 추가된 C++0x의 기능이나 또는 이전에 설명한 것들을 더 깊게 설명할지 고민을 한 후 다시 찾아 뵙겠습니다.^^

 

 

 

참고

Producer-Consumer 모델 : 자바워크님의 http://javawork.egloos.com/2397148

MSDN concurrent_queue :

http://msdn.microsoft.com/en-us/library/dd504906(VS.100).aspx#queue



출처: http://vsts2010.tistory.com/193 [Visual Studio 2010 공식 팀 블로그 @vsts2010]

 

 

 

=================================

=================================

=================================

 

 

 

출처: http://vsts2010.tistory.com/163

 

 

Visual Stuido 2010 Beta2가 나오면서 제가 기대하고 있었던 병렬 컨테이너가 드디어 구현되었습니다.

 

Concurrency Runtime(이하 ConRT)에는 총 3개의 병렬 컨테이너를 제공합니다. Beta2에서는 모두 다 구현되지는 못하고 concurrent_vector concurrent_queue 두 개가 구현되었습니다아직 구현되지 않은 것은 concurrent_hash_map 입니다.

 

세 개의 컨테이너들은 C++ STL의 컨테이너 중에서 가장 자주 사용하는 것으로 vector, deque, hash_map 컨테이너의 병렬 버전이라는 것을 이름을 보면 쉽게 알 수 있을 것입니다.

 

STL에 있는 컨테이너와 비슷한 이름을 가진 것처럼 사용 방법도 기존의 컨테이너와 비슷합니다다만 병렬 컨테이너 답게 스레드 세이프하며기존의 컨테이너에서 제공하는 일부 기능을 지원하지 못하는 제한도 있습니다.

 

 

몇 회에 나누어서 concurrent_vector concurrent_queue에 대해서 설명해 나가겠습니다.

이번에는 첫 번째로 concurrent_vector에 대한 것입니다.

 

 

 

concurrent_vector란?

 

STL vector와 같이 임의 접근이 가능한 시퀀스 컨테이너입니다. concurrent_vector는 멀티 스레드에서 요소를 추가하던가 특정 요소에 접근해도 안전합니다반복자의 접근과 순회는 언제나 멀티 스레드에서 안전해야 하므로 요소를 추가할 때는 기존의 인덱스와 반복자를 무효화 시키면 안됩니다.

 

 

concurrent_vector vector의 차이점

 

기능 vctor Concurrent_vector
추가 스레드에 안전하지 않음 스레드에 안전
요소에 접근 스레드에 안전하지 않음 스레드에 안전
반복자 접근 및 순회 스레드에 안전하지 않음 스레드에 안전
push_back 가능 가능
insert 가능 불가능
clear 모두 삭제 모두 삭제
erase 가능 불가능
pop_back 가능 불가능
배열식 접근 예. &v[0]+2 가능 불가능
    grow_by, grow_to_at_least (vector resize와 비슷)는 스레드에 안전
    추가 또는 resize 때 기존 인덱스나 반복자의 위치가 바뀌지 않음
    bool 형은 정의 되지 않았음

 


concurrent_vector에 대한 설명을 이번에는 소개 정도로 끝내고 다음부터는 본격적으로 Concurrent_vector을 어떻게 사용하면 되는지 상세하게 설명해 나가겠습니다.^^

출처: http://vsts2010.tistory.com/163 [Visual Studio 2010 공식 팀 블로그 @vsts2010]

 

 

 

반응형

 

728x90

 

=================================

=================================

=================================

 

 

 

출처: http://cosmosnet.tistory.com/entry/%EB%B3%91%EB%A0%AC-%ED%94%84%EB%A1%9C%EA%B7%B8%EB%9E%98%EB%B0%8D-%EC%99%84%EC%A0%84-%EC%A0%95%EB%B3%B5-5-%EB%B3%91%EB%A0%AC-%EC%95%8C%EA%B3%A0%EB%A6%AC%EC%A6%98-1

 

 

데브피아 김경진님 작성 (http://devmachine.blog.me/179227204)

 

 

병렬 알고리즘

 

PPL에서는 데이터 집합(배열, vector 등등..)을 기반으로 작업을 동시에 처리할 수 있는 알고리즘 함수들을 제공합니다. 이런 병렬 알고리즘 함수들은 기존에 STL에서 사용하던 알고리즘 함수와 사용 방법이 거의 비슷하다는 장점을 가지고 있습니다. 이러한 장점 때문에 STL 알고리즘 함수를 사용해 본 경험이 있는 분들은 별 다른 어려움 없이 코드를 이해할 수 있을 뿐만 아니라 기존에 STL 알고리즘을 이용하여 구현된 코드를 PPL의 병렬 알고리즘으로 쉽게 변환할 수 있습니다. PPL의 병렬 알고리즘은 병렬 프로그래밍과 스레드에 대한 특별한 기반 지식 없이도 쉽게 사용할 수 있기 때문에 아마도 이번 강좌가 제 병렬 프로그래밍 강좌 중에서 가장 쓰임새 있는 강좌가 되지 않을까 싶네요.

 

 

parallel_for

 

가장 먼저 소개해 드릴 함수는 parallel_for 함수입니다. 이름에서 이미 예상할 수 있듯이 for 루프의 내용을 병렬로 실행해주는 함수이지요. parallel_for 함수는 루프 안에서 자원을 공유하지 않고 서로 독립적인 작업을 할 경우에 유용하게 사용될 수 있습니다. parallel_for 함수를 비롯해 이후에 설명하게될 대부분의 병렬 알고리즘 함수는 내부적으로 (지난 강좌에서 배운) structured_task_group 클래스를 이용하여 구현되어 있으며 work-stealing 알고리즘 등을 이용하여 최적화된 방법으로 작업을 분할하여 균형있게 처리합니다. 그러므로 사용자들은 몇 개의 스레드가 실행되고 어떻게 작업이 분배되는지 전혀 알 필요가 없습니다. 단지 작업이 '동시에' 처리될 수 있다는 점만 유념하여 코드를 구현해주면 됩니다.

 

parallel_for 함수에는 몇 가지 오버로드 버전이 존재하는데 그 중 가장 대표적인 버전은 첫 번째 파라미터에 시작 인덱스를, 두 번째 파라미터에는 종료 인덱스를 전달하고 세 번째 파라미터에 작업 함수(함수형)를 전달하여 호출합니다. 또 다른 오버로드 버전은 시작 인덱스와 종료 인덱스 뒤로 증가될 값의 크기(Step)를 전달하고 마지막에 작업 함수를 전달하여 호출합니다.

 

기존의 for 루프를 병렬로 처리하기 위한 대체 수단으로 parallel_for 함수를 이용할 수 있지만, for 루프와 parallel_for 함수의 차이점을 유의하여 작성하여야 합니다. 그 차이점을 정리해보면 다음과 같습니다.

 

  • parallel_for 함수가 처리하는 작업의 실행 순서는 정해져 있지 않습니다.
 
  • for 루프에서는 break를 통해 도중에 루프를 빠져나올 수 있지만 parallel_for 함수는 모든 반복 작업이 끝난 이후에 종료됩니다.
 
  • 시작 인덱스와 종료 인덱스는 반드시 정수형(양수, 음수)이어야 합니다.
 
  • 루프의 진행은 순방향으로만 가능합니다. 이는 증가될 값의 크기(Step)가 음수값이 될 수 없음을 의미합니다. 만약 Step 값이 1보다 작으면 std::invalid_argument 예외가 발생합니다.
 
parallel_for 함수의 임의 중단은 task 그룹의 취소를 이용하여 구현할 수 있으며 이는 이후 Cancellation 강좌에서 자세히 다루도록 하겠습니다. 그럼 이제 기다리던 예제를 한 번 살펴 보도록 하죠.

 

// 병렬 알고리즘을 사용하려면 ppl.h 파일을 포함시켜야 합니다.
#include <ppl.h>
#include <array>

#include <iostream>

using namespace concurrency;
using namespace std;

int wmain()
{
// 정수값을 담는 배열을 생성합니다.
array<int, 5> values1 = { 1, 2, 3, 4, 5 };
array<int, 5> values2 = { 2, 3, 4, 5, 6 };
array<int, 5> results;

// values1 배열의 값과 values2 배열의 값을 더하여 results 배열에 저장합니다.
parallel_for(0, 5, [&](int index) {
results[index] = values1[index] + values2[index];
});

// results 배열의 값을 출력합니다.
for (int i = 0; i < 5; ++i)
{
wcout << results[i] << L" ";
}
}

 

3 5 7 9 11

 

 

이 예제에서는 정수를 담고있는 두 배열에서 같은 인덱스에 해당하는 원소의 합을 또 다른 배열에 담아 출력하는 것을 parallel_for 함수를 이용하여 구현하고 있습니다. 여기서는 parallel_for 함수의 시작 인덱스를 0으로 설정하고, 종료 인덱스를 5로 설정하였는데 람다 함수의 index 변수에 전달되는 값이 0~5 값이 아닌 0~4 값이라는 것만 주의하시면 됩니다.

 

parallel_for 함수는 기존의 for 루프와 마찬가지로 중첩 루프도 구현이 가능합니다. 작업 함수에 람다 함수를 사용하면 기존 방식 그대로 직관적인 방법으로 중첩된 루프를 구현할 수 있죠. 그럼 이번엔 parallel_for 함수를 중첩 루프로 구현한 예제를 하나 더 만들어보도록 하죠.

 

#include <ppl.h>
#include <array>
#include <iostream>

using namespace concurrency;
using namespace std;

int wmain()
{
// 구구단 테이블 생성합니다.
array<array<int, 9>, 8> multi_table;

// parallel_for 함수를 중첩하여 사용하여 구구단 테이블을 병렬로 채웁니다.
parallel_for(2, 10, [&](int i) {
parallel_for(1, 10, [&](int j) {
multi_table[i - 2][j - 1] = i * j;
});
});

// 구구단 테이블을 출력합니다.
for (auto it1 = multi_table.begin(); it1 != multi_table.end(); ++it1)
{
for (auto it2 = it1->begin(); it2 != it1->end(); ++it2)
{
wcout << *it2 << L" ";
}
wcout << endl;
}
}

 

2 4 6 8 10 12 14 16 18 
3 6 9 12 15 18 21 24 27
4 8 12 16 20 24 28 32 36 
5 10 15 20 25 30 35 40 45
6 12 18 24 30 36 42 48 54
7 14 21 28 35 42 49 56 63
8 16 24 32 40 48 56 64 72 
9 18 27 36 45 54 63 72 81

 

 

구구단 코드를 병렬 알고리즘을 이용해 만들어보니 또 감회가 새롭네요. ^^; 이 예제에서는 구구단 테이블의 값을 중첩된 parallel_for 함수를 이용하여 채우고 있습니다. 어떤가요..기존 for 루프와 전혀 다르지 않죠? 이어서 설명할 parallel_for_each 함수도 중첩에 관한 내용은 parallel_for 함수와 같으므로 그 부분에 대해서는 따로 설명드리지 않도록 하겠습니다.

 

 

parallel_for_each

 

parallel_for_each 함수는 iterator 기반의 컨테이너와 함께 사용하며 컨테이너의 범위를 지정하여 지정된 범위 안의 데이터를 병렬로 처리하는 함수입니다. parallel_for_each 함수의 동작 방식은 이미 살펴본 parallel_for와 유사하며 내부적으로 같은 알고리즘을 사용하여 구현되어 있습니다. parallel_for_each 함수는 데이터를 병렬로 처리하기 때문에 실행 순서를 보장받을 수 없다는 것을 제외하면 STL의 알고리즘 함수인 for_each 함수와 매우 유사합니다. 우리는 이미 for_each 함수에 익숙하기 때문에 parallel_for_each 함수 역시 어렵지 않게 작성할 수 있죠. 그럼 바로 예제를 살펴보겠습니다.

 

#include <ppl.h>
#include <array>
#include <sstream>
#include <iostream>

using namespace concurrency;
using namespace std;

int wmain()
{
// 정수값을 담는 배열을 생성합니다.
array<int, 5> values = { 1, 2, 3, 4, 5 };

// 배열에 담긴 값을 병렬로 출력합니다.
parallel_for_each(begin(values), end(values), [](int value) {
wstringstream ss;
ss << value << L' ';
wcout << ss.str();
});
}

 

4 5 1 2 3

 

 

이번 예제는 MSDN 예제를 그대로 가져와 봤습니다. 코드를 봐도 딱히 설명할 것이 없는 초간단 예제네요. 병렬 프로그래밍이 이렇게 쉬워질 수 있다니... 참으로 고마운 일입니다. ^^ 작업의 실행 순서가 보장 되지 않기 때문에 배열 값의 출력 순서가 실행 할 때 마다 달라질 수 있다는 것만 유의하시면 될 것 같네요.

 

지금까지 parallel_for 함수와 parallel_for_each 함수에 대하여 알아보았습니다. 두 함수는 PPL 중에서도 가장 쉬운 방법으로 병렬 프로그래밍을 구현할 수 있는 도구가 아닌가 생각이 됩니다. 이 강좌를 통해서 많은 분들이 병렬 프로그래밍은 어렵지 않다는 것을 알게 되었으면 좋겠네요. 다음 강좌에는 parallel_invoke 함수를 비롯한 나머지 병렬 알고리즘에 대하여 알아보도록 하겠습니다.

 

 

Reference

 

Parallel Algorithms

 

출처: http://cosmosnet.tistory.com/entry/병렬-프로그래밍-완전-정복-5-병렬-알고리즘-1 [Creative Motive]

 

 

=================================

=================================

=================================

 

 

 

 

 

출처: http://daniel00.tistory.com/entry/concurrentqueue

 

 

 

전자쟁이가 MFC를 하다보니 순수한 소프트웨어 어플리케이션 보다는 하드웨어를 제어하는 목적의 소프트웨어를 개발하고 있다.

주로 외부머신(MCU/DSP/FPGA 등)의 데이터를 수집하여 가공하고 디스플레이 해주고 제어 커맨드를 날려주는 등의 소프트웨어 이다.

그렇다 보니 머신에서 날라오는 데이터를 쉼없이 처리하여야 하는 일이 빈번하다.

이를 위하여 , 머신에서 날라오는 입력데이터를 일단 큐에 담아놓고 하나씩 빼면서 어떤 데이터인지 분석하고 그에 맞는 처리를 하고 있다. 큐의 Head에서는 데이터를 빼가고 Tail에는 데이터를 추가하는 형태이다.

실제로는 스레드 두개를 돌리면서 한넘은 쌓고 한넘은 빼가는 로직이다.

처음에는 배열형태로 고정크기의 원형큐를 구현하여 사용하였었는데 데이터량이 많아지고 처리해야할 일이 많아지다 보니 큐에 쌓는 속도보다 큐에서 읽어가는 속도가 느려져서 결국 큐 오버플로우가 발생하여 데이터가 누락되는 경우가 발생하였다. 

그래서 대안으로 CList를 사용하여 링크드 리스트 형태로 큐를 구현하여 큐가 쌓이더라도 오버플로우는 발생하지 않도록 임시 방편을 취하였다. 하지만 이 또한 문제가 있는것이 만에하나 계속 쌓이는 경우가 발생하면 결국은 프로그램이 뻗게 될것이고 또한 배열이던 리스트던 큐는 읽고 쓰는 스레드두개의 공유 메모리이기 때문에 동시 접근이 불가하므로 크리티컬섹션으로 메모리 동시접근을 막는 형태로 사용 할 수 밖에 없기에 스레드 스위칭에 따른 지연발생은 불가피 하였다.

이런 문제를 해결하고자 큐에 동시접근하는 방법을 고민하여 정보를 찾던중 concurrent_queue라는 것을 알게되었다.

Visual Studio 2010부터 지원되는 PPL(Parallel Pattern Library)에 포함된 템플릿 클래스로 큐를 읽고 쓰는 작업을 동시에 수행 할 수 있으며 또한 스레드 세이프하게 구현된 큐를 사용 할 수 있도록 하여주는 클래스이다.

 

오!~ 사용하여 보니 너무나 좋다.

사용법 간편하고 속도 빠르다. 동시에 읽고 쓰는 작업이 필요한 병렬처리 로직에 아주 제격이다.

간단한 테스트로 바이트스트림 십만개를 스레드 두개를 이용하여 단지 쓰고 /읽는 작업만을 하여 그 시간을 측정하여 보았는데, 결과는

 

concurrent_queue         :  약     62ms

일반 리스트로 구현한 큐 :  약  1077ms

 

10배 이상의 차이가 난다.

물론 다른 로직을 잘 설계하여야 하겠지만 단지 큐를 쓰고 읽는데에 허비되는 시간은 많이 줄일 수 있을 것 같다.

나와 비슷한 고민을 하고 있는 공돌이 분들께 도움이 되었으면 한다.

첨부에 코드를 첨부하였다. 본문에는 코드 일부만 (Visual Studio 2013으로 구현)

 

*주의*

try_pop()을 스레드에서 호출할 경우 try_pop()에 실패하는 경우 즉, 큐에 데이터가 없는 경우에도 스레드가 계속 돌면서 CPU점유율을 잡아먹는 경우가 발생한다. CPU를 쉬게하는것은 바보같은 일이지만 이같은 경우 처럼 불필요한 루프를 돌리느라 CPU를 점유시킨다는건 더더욱 바보같은 일이다. 따라서 큐에 데이터가 없을 경우에는 CPU를 잠시 다른 프로세스&스레드에게 양보하도록 하자.

Sleep(0)을 줄경우 Sleep()을 호출한 스레드보다 우선순위가 높은 스레드가 없으면 다시 자기 스레드를 돌리는 경우가 발생 할 수 있다.

Sleep(1)은 우선순위에 상관없이 대기중인 다른 스레드에게 무조건 양보한다.

따라서, 본인의 로직을 잘 점검하여 Sleep(0)을 쓸지 Sleep(1)을 쓸지 자~알 결정 하여야 한다.

UINT ThreadPop(LPVOID lpParam)

{

while(1)

{

if(queue.try_pop(data))

{

//data처리

}

else

{

Sleep(1);    //CPU를 다른 스레드에게 양보하자.

}

}

}

 

 

 

 

 

 

 

 

 

<동작설명>

 

concurrent_queue.zip
다운로드

 

 

 

- #define CONCURRENT 의 주석여부로 일반 리스트를 사용하는 경우와 concurrent_queue를 사용하는 경우를 구분

- 버튼을 누르면, 한쪽 스레드에서는 큐에 데이터를 쓰고 다른 스레드에서는 데이터를 읽는다(50000개)

 

//아래 디파인을 주석처리 하면 일반 리스트를 사용한 큐 사용

//주석 풀리면 concurrent_queue 사용

 

#define CONCURRENT

UINT nSize = 50000000;

 

UINT ThreadPush(LPVOID lpParam)

{

Cconcurrent_queueDlg* pDlg = (Cconcurrent_queueDlg*)theApp.m_pMainWnd;

 

#ifdef CONCURRENT

pDlg->SetWindowTextW(_T("concurrent_queue를 이용한 큐"));

 

for (int i = 1; i <= nSize; i++)

{

pDlg->queue1.push(i);

}

 

#else

pDlg->SetWindowTextW(_T("CList를 이용한 큐"));

 

for (int i = 1; i <= nSize; i++)

{

pDlg->cs.Lock();

 

pDlg->queue2.AddTail(i);

 

pDlg->cs.Unlock();

}

#endif

 

pDlg->m_pThreadPush = NULL;

 

return 0;

}

 

UINT ThreadPop(LPVOID lpParam)

{

Cconcurrent_queueDlg* pDlg = (Cconcurrent_queueDlg*)theApp.m_pMainWnd;

int num = 0;

CString strNum;

CString strDelta = _T("");

 

DWORD start, end;

 

//pDlg->m_editOutWnd.SetSel(0, -1);

//pDlg->m_editOutWnd.Clear();

 

 

#ifdef CONCURRENT

while (1)

{

if (pDlg->queue1.try_pop(num))

{

if (num == 1)

{

start = GetTickCount();

}

else if (num == nSize)

{

end = GetTickCount();

 

strDelta.Format(_T("Delta: %d, Last Num: %d \r\n"), end - start, num);

//pDlg->m_editOutWnd.ReplaceSel(strDelta);

AfxMessageBox(strDelta);

}

//break;

}

else

{

Sleep(1);

}

 

//Sleep(0);

}

 

#else

while (num<nSize)

{

if (pDlg->queue2.GetCount() > 0) //RemoveHead() 함수는 스레드 세이프하지 않으므로 이렇게 사이즈를 점검 하여야 한다

{

pDlg->cs.Lock();

 

num = pDlg->queue2.RemoveHead();

//strNum.Format(_T("%d\r\n"), num);

//pDlg->m_editOutWnd.ReplaceSel(strNum);

 

pDlg->cs.Unlock();

}

}

#endif

 

pDlg->m_pThreadPop = NULL;

 

end = GetTickCount();

 

strDelta.Format(_T("Delta: %d, Last Num: %d \r\n"), end-start,num);

//pDlg->m_editOutWnd.ReplaceSel(strDelta);

AfxMessageBox(strDelta);

 

return 0;

}

 

 



출처: http://daniel00.tistory.com/entry/concurrentqueue [공돌이(Engineer+Footballer) 세상]

 

=================================

=================================

=================================

 

 

반응형