Wednesday, March 6, 2013

Pig + Hive, not Pig or Hive

Hadoop ecosystem에서 Data processing 을 위해서 Pig와 Hive가 탄생한지도 시간이 많이 되었다. 
많은 사람들(??)이 Pig와 Hive를 사용하지만 대부분 둘 다 비슷한거 아니냐는 질문을 많이 하고(심지어 팀에서는 pig와 hive로 같은 data pipeline의 performance를 테스트 해본경우도 있다;; 삽질), 나 역시 처음에는 sql에 익숙하면 Hive, 아니면 Pig라고 생각했었다. 1년간 Pig를 나름 유용하게 사용해오면서 지금은 Pig or Hive가 아닌 Pig + Hive여야 한다는 주장을 하려고(대부분 이렇게 생각하나??)한다. 


첫 번째로 해야 할 질문은 그럼 이 두 가지 멋진 tool이 무슨 문제를 해결 하려고 고민 하다가 생긴 것일까 이다.
모든 마찬가지지만 문제를 이해하는 것이 가장 중요하므로 문제를 자세히 살펴보자.
한마디로 문제는 data processing, 좀 더 설명하면 아래와 같다.

Data processing
  1. data collection: 데이터를 여러 source로부터 모은다. 
  2. data preparation(ETL, data factory): raw data로 부터 data user가 사용 할 수 있도록 data sets을 생성한다.
  3. data presentation(DW): users이 off of the shelves로 data를 select해볼 수 있도록 data sets을 저장해 놓는다.
Pig와 Hive는 위에서 2,3번, 즉 ETL과 DW단계를 위해서 만들어 졌다라고 볼 수 있다(이 단계들에 가장 적합하다라고 하는게 더 정확 할 수 있다).

그럼 ETL과 DW는 몬가? 왜 ETL은 Pig로, DW는 Hive로 라고 주장하는 지 설명하기 위해 ETL과 DW의 간단한(나도해본??) use cases를 살펴 보자.
먼저 ETL
  1. pipelines: data의 stream을 feed, clean, transform해서 DW에 적재 한다.
  2. iterative processing: very large data set와 매일 들어오는 small new data간의 join.
  3. research: unknown schemas와 unstructured data들을 원하는 data로 transform한다.
그러면 DW는?
  1. business-intelligence analysis: data를 BI와 연결한다.
  2. ad-hoc queries for analytics
일단 DW에 대한 선택은 명확히 Hive이므로 생략한다.
그럼 비교할 건 ETL을 Hive로 하는 것과 Pig로 하는 것이다.
결론 부터 말하면 ETL은 Pig가 더 적합하다. Hive로도 ETL을 할 수 있고 하는 경우를 많이(??) 보았지만 Pig가 ETL에서 더 적합한 이유에 대해 설명해 보고자 한다. 원글은 pig의 PMC인 yahoo의 Architect인 Alan Gates의 article을 참조하고 실제 내가 생각할 때 중요한 이유 순서대로 reordering해보았다. 


1. Pig Latin supports splits in the pipeline.

pig는 DAG(directed acyclic graph)를 지원한다. 이게 무슨말인지 알고나면 이 부분이 pig의 가장 큰 강점이라는 생각이 든다.

SQL의 query는 하나의 result relation을 생성하는 속성을 가지고 있으므로 data processing stream을 split하고 각각의 sub stream에 다른 operation을 적용할 수가 없다.

예를 들어 하나의 data set을 읽고 여러 grouping key들로 group by를 한 결과를 저장하고 싶다고 하자(실제로 가장 빈번하게 일어나는 일 같다). 

Users         = load 'users' as (name, age, gender, zip);
Purchases     = load 'purchases' as (user, purchase_price);
UserPurchases = join Users by name, Purchases by user;
GeoGroup      = group UserPurchases by zip;
GeoPurchase   = foreach GeoGroup generate group, SUM(UserPurchases.purchase_price) as sum;
ValuableGeos  = filter GeoPurchase by sum > 1000000;
store ValuableGeos into 'byzip';
DemoGroup     = group UserPurchases by (age, gender);
DemoPurchases = foreach DemoGroup generate group, SUM(UserPurchases.purchase_price) as sum;
ValuableDemos = filter DemoPurchases by sum > 100000000;
store ValuableDemos into 'byagegender';

위의 pig script는 Users, Purchases를 읽어서 join을 하고(M/R job 1), join된 stream을 2개의 stream(group by)으로 split 한다. 매 group by마다 M/R job을 실행하는 hive에 비해 pig는 이를 한번의 M/R로 처리할 수 있는 것이다. 대부분 data가 커질 경우 M/R job에서 IO가 차지하는 비중이 높아 지는데 이 IO read/write횟수를 줄임으로 이득을 얻을 수 있다.

2. Pig Latin is procedural where SQL is declarative 

한 마디로 정의 하기 힘든 개념(부족한 능력으로)이지만 설명하기 위해 예제를 보자. click로그에서 사용자의 geoinfo중 dma code별 click count를 뽑고 싶다고 하자.

Users                = load 'users' as (name, age, ipaddr);
Clicks               = load 'clicks' as (user, url, value);
ValuableClicks       = filter Clicks by value > 0;
UserClicks           = join Users by name, ValuableClicks by user;
Geoinfo              = load 'geoinfo' as (ipaddr, dma);
UserGeo              = join UserClicks by ipaddr, Geoinfo by ipaddr;
ByDMA                = group UserGeo by dma;
ValuableClicksPerDMA = foreach ByDMA generate group, COUNT(UserGeo); 
store ValuableClicksPerDMA into 'ValuableClicksPerDMA'; 

insert into ValuableClicksPerDMA
select dma, count(*)
from geoinfo join (
select name, ipaddr
from users join clicks on (users.name = clicks.user)
where value > 0;
) using ipaddr 
group by dam;

hive에서 subquery를 없애기 위해 temp table을 이용 할 수 있을 것이다. 

create table temp (
name string,
ipaddr string
) .....

insert into temp 
select name, ipaddr
from users join clicks on (users.name = clicks.user)
where value > 0;

insert into ValuableClicksPerDMA
select dma, count(*)
from geoinfo join temp on (geoinfo.ipaddr = temp.ipaddr)
group by dam;

temp table을 이용할 경우 한개의 sql이 아닌 여러 sqls로 나뉘게 되고,  order of query즉 여러 sqls이 어떤 순서로 execute되야 하는지는 master script에 의존한다.  즉 각각의  table들이 여러 sqls에 의해 어텋게 사용되는지 알 필요가 있다는 것이다.  pipeline이 복잡해 질때 늘어나는 temp table들과 sqls마다 사용할 temp table이 모였는지 확인 하는 작업을 해보면 공감이 간다.

3. Pig Latin allows pipeline developers to decide where to checkpoint data in the pipeline.

pig가 temp table을 사용하지 않는 다는것이 약점이 될 수 있지만(failure발생 시 전체 pipeline을 다시 실행해야 할 수도 있다), 사용자가 data를 pipeline중 어떤 지점에서든지 execution을 방해 하지 않고(no additional M/R job) 저장 할 수 있게 함으로써 더 많은 자유도를 준다.

Users                = load 'users' as (name, age, ipaddr);
Clicks               = load 'clicks' as (user, url, value);
ValuableClicks       = filter Clicks by value > 0;
UserClicks           = join Users by name, ValuableClicks by user;
Geoinfo              = load 'geoinfo' as (ipaddr, dma);
UserGeo              = join UserClicks by ipaddr, Geoinfo by ipaddr;
store UserGeo into 'UserGeoIntermediate';
ByDMA                = group UserGeo by dma;
ValuableClicksPerDMA = foreach ByDMA generate group, COUNT(UserGeo);
store ValuableClicksPerDMA into 'ValuableClicksPerDMA';

4. Pig Latin allows the developer to select specific operator implementations directly rather than relying on the optimizer.

어떤 join implementation을 쓸건지, 어떤 group by  implementation을 쓸건지 명시적으로 선택 할 수 있고,  operation별로 parallelization factor 도 선택할 수 있다. 이를 다시 말하면 미리 각 operation(job)에 들어오는 data size를 예측 할 수 있다면 parallelization factor를 더 optimize할 수 있다는 얘기이다. 많은 부분 pipeline내의 data size가 예측이 가능하므로 optimze에서도 더 자유도를 얻을 수 있다.

5. Pig Latin allows developers to insert their own code almost anywhere in the data pipeline.

UDF를 어떤 부분이든지 사용할 수 있음으로해서 더 자유도를 얻을 수 있다. 
예로 hive에서는 어느정도 정형화된 format으로 가공되어 있는 data로 시작해야 하는 반면(Serde가 있긴 하지만 제한), pig는 load에도 udf를 사용할 수 있어 unstructured log를 처리 할 때 더 많은 자유도가 있다.


결론 적으로 Hive와 Pig중 둘 중 하나만 사용하는 것보다 둘다 용도에 맞게 사용하는게 맞다라는 원글의 주장을 격하게 동의한다.