Wednesday, March 6, 2013

Pig + Hive, not Pig or Hive

Hadoop ecosystem에서 Data processing 을 위해서 Pig와 Hive가 탄생한지도 시간이 많이 되었다. 
많은 사람들(??)이 Pig와 Hive를 사용하지만 대부분 둘 다 비슷한거 아니냐는 질문을 많이 하고(심지어 팀에서는 pig와 hive로 같은 data pipeline의 performance를 테스트 해본경우도 있다;; 삽질), 나 역시 처음에는 sql에 익숙하면 Hive, 아니면 Pig라고 생각했었다. 1년간 Pig를 나름 유용하게 사용해오면서 지금은 Pig or Hive가 아닌 Pig + Hive여야 한다는 주장을 하려고(대부분 이렇게 생각하나??)한다. 


첫 번째로 해야 할 질문은 그럼 이 두 가지 멋진 tool이 무슨 문제를 해결 하려고 고민 하다가 생긴 것일까 이다.
모든 마찬가지지만 문제를 이해하는 것이 가장 중요하므로 문제를 자세히 살펴보자.
한마디로 문제는 data processing, 좀 더 설명하면 아래와 같다.

Data processing
  1. data collection: 데이터를 여러 source로부터 모은다. 
  2. data preparation(ETL, data factory): raw data로 부터 data user가 사용 할 수 있도록 data sets을 생성한다.
  3. data presentation(DW): users이 off of the shelves로 data를 select해볼 수 있도록 data sets을 저장해 놓는다.
Pig와 Hive는 위에서 2,3번, 즉 ETL과 DW단계를 위해서 만들어 졌다라고 볼 수 있다(이 단계들에 가장 적합하다라고 하는게 더 정확 할 수 있다).

그럼 ETL과 DW는 몬가? 왜 ETL은 Pig로, DW는 Hive로 라고 주장하는 지 설명하기 위해 ETL과 DW의 간단한(나도해본??) use cases를 살펴 보자.
먼저 ETL
  1. pipelines: data의 stream을 feed, clean, transform해서 DW에 적재 한다.
  2. iterative processing: very large data set와 매일 들어오는 small new data간의 join.
  3. research: unknown schemas와 unstructured data들을 원하는 data로 transform한다.
그러면 DW는?
  1. business-intelligence analysis: data를 BI와 연결한다.
  2. ad-hoc queries for analytics
일단 DW에 대한 선택은 명확히 Hive이므로 생략한다.
그럼 비교할 건 ETL을 Hive로 하는 것과 Pig로 하는 것이다.
결론 부터 말하면 ETL은 Pig가 더 적합하다. Hive로도 ETL을 할 수 있고 하는 경우를 많이(??) 보았지만 Pig가 ETL에서 더 적합한 이유에 대해 설명해 보고자 한다. 원글은 pig의 PMC인 yahoo의 Architect인 Alan Gates의 article을 참조하고 실제 내가 생각할 때 중요한 이유 순서대로 reordering해보았다. 


1. Pig Latin supports splits in the pipeline.

pig는 DAG(directed acyclic graph)를 지원한다. 이게 무슨말인지 알고나면 이 부분이 pig의 가장 큰 강점이라는 생각이 든다.

SQL의 query는 하나의 result relation을 생성하는 속성을 가지고 있으므로 data processing stream을 split하고 각각의 sub stream에 다른 operation을 적용할 수가 없다.

예를 들어 하나의 data set을 읽고 여러 grouping key들로 group by를 한 결과를 저장하고 싶다고 하자(실제로 가장 빈번하게 일어나는 일 같다). 

Users         = load 'users' as (name, age, gender, zip);
Purchases     = load 'purchases' as (user, purchase_price);
UserPurchases = join Users by name, Purchases by user;
GeoGroup      = group UserPurchases by zip;
GeoPurchase   = foreach GeoGroup generate group, SUM(UserPurchases.purchase_price) as sum;
ValuableGeos  = filter GeoPurchase by sum > 1000000;
store ValuableGeos into 'byzip';
DemoGroup     = group UserPurchases by (age, gender);
DemoPurchases = foreach DemoGroup generate group, SUM(UserPurchases.purchase_price) as sum;
ValuableDemos = filter DemoPurchases by sum > 100000000;
store ValuableDemos into 'byagegender';

위의 pig script는 Users, Purchases를 읽어서 join을 하고(M/R job 1), join된 stream을 2개의 stream(group by)으로 split 한다. 매 group by마다 M/R job을 실행하는 hive에 비해 pig는 이를 한번의 M/R로 처리할 수 있는 것이다. 대부분 data가 커질 경우 M/R job에서 IO가 차지하는 비중이 높아 지는데 이 IO read/write횟수를 줄임으로 이득을 얻을 수 있다.

2. Pig Latin is procedural where SQL is declarative 

한 마디로 정의 하기 힘든 개념(부족한 능력으로)이지만 설명하기 위해 예제를 보자. click로그에서 사용자의 geoinfo중 dma code별 click count를 뽑고 싶다고 하자.

Users                = load 'users' as (name, age, ipaddr);
Clicks               = load 'clicks' as (user, url, value);
ValuableClicks       = filter Clicks by value > 0;
UserClicks           = join Users by name, ValuableClicks by user;
Geoinfo              = load 'geoinfo' as (ipaddr, dma);
UserGeo              = join UserClicks by ipaddr, Geoinfo by ipaddr;
ByDMA                = group UserGeo by dma;
ValuableClicksPerDMA = foreach ByDMA generate group, COUNT(UserGeo); 
store ValuableClicksPerDMA into 'ValuableClicksPerDMA'; 

insert into ValuableClicksPerDMA
select dma, count(*)
from geoinfo join (
select name, ipaddr
from users join clicks on (users.name = clicks.user)
where value > 0;
) using ipaddr 
group by dam;

hive에서 subquery를 없애기 위해 temp table을 이용 할 수 있을 것이다. 

create table temp (
name string,
ipaddr string
) .....

insert into temp 
select name, ipaddr
from users join clicks on (users.name = clicks.user)
where value > 0;

insert into ValuableClicksPerDMA
select dma, count(*)
from geoinfo join temp on (geoinfo.ipaddr = temp.ipaddr)
group by dam;

temp table을 이용할 경우 한개의 sql이 아닌 여러 sqls로 나뉘게 되고,  order of query즉 여러 sqls이 어떤 순서로 execute되야 하는지는 master script에 의존한다.  즉 각각의  table들이 여러 sqls에 의해 어텋게 사용되는지 알 필요가 있다는 것이다.  pipeline이 복잡해 질때 늘어나는 temp table들과 sqls마다 사용할 temp table이 모였는지 확인 하는 작업을 해보면 공감이 간다.

3. Pig Latin allows pipeline developers to decide where to checkpoint data in the pipeline.

pig가 temp table을 사용하지 않는 다는것이 약점이 될 수 있지만(failure발생 시 전체 pipeline을 다시 실행해야 할 수도 있다), 사용자가 data를 pipeline중 어떤 지점에서든지 execution을 방해 하지 않고(no additional M/R job) 저장 할 수 있게 함으로써 더 많은 자유도를 준다.

Users                = load 'users' as (name, age, ipaddr);
Clicks               = load 'clicks' as (user, url, value);
ValuableClicks       = filter Clicks by value > 0;
UserClicks           = join Users by name, ValuableClicks by user;
Geoinfo              = load 'geoinfo' as (ipaddr, dma);
UserGeo              = join UserClicks by ipaddr, Geoinfo by ipaddr;
store UserGeo into 'UserGeoIntermediate';
ByDMA                = group UserGeo by dma;
ValuableClicksPerDMA = foreach ByDMA generate group, COUNT(UserGeo);
store ValuableClicksPerDMA into 'ValuableClicksPerDMA';

4. Pig Latin allows the developer to select specific operator implementations directly rather than relying on the optimizer.

어떤 join implementation을 쓸건지, 어떤 group by  implementation을 쓸건지 명시적으로 선택 할 수 있고,  operation별로 parallelization factor 도 선택할 수 있다. 이를 다시 말하면 미리 각 operation(job)에 들어오는 data size를 예측 할 수 있다면 parallelization factor를 더 optimize할 수 있다는 얘기이다. 많은 부분 pipeline내의 data size가 예측이 가능하므로 optimze에서도 더 자유도를 얻을 수 있다.

5. Pig Latin allows developers to insert their own code almost anywhere in the data pipeline.

UDF를 어떤 부분이든지 사용할 수 있음으로해서 더 자유도를 얻을 수 있다. 
예로 hive에서는 어느정도 정형화된 format으로 가공되어 있는 data로 시작해야 하는 반면(Serde가 있긴 하지만 제한), pig는 load에도 udf를 사용할 수 있어 unstructured log를 처리 할 때 더 많은 자유도가 있다.


결론 적으로 Hive와 Pig중 둘 중 하나만 사용하는 것보다 둘다 용도에 맞게 사용하는게 맞다라는 원글의 주장을 격하게 동의한다.


Monday, December 3, 2012

Personalized Pagerank for link prediction

I read Edwin Chen`s blog(who is famous data scientist, my role model!) and read interesting post.

Problem was Edge Prediction in given social graph. after I read through Edwin`s blog, I decided to implement it with Map/Reduce since I am not fluent with Scalding and scalar yet(These days I have been busy to catching up Functional Programming course from Coursera and it`s so much fun).

excerpt from Edwin`s blog

Briefly, a personalized PageRank is like standard PageRank, except that when randomly teleporting to a new node, the surfer always teleports back to the given source node being personalized (rather than to a node chosen uniformly at random, as in the classic PageRank algorithm).

I was wondering how efficient to compute personalized pagerank for all vertices in graph. using Map/Reduce with mahout`s matrix library, this computation becomes following.

1. build following matrix
2. build follower matrix
3. union following matrix and follower matrix and row normalize => undirected adjacency matrix
4. create digonal matrix and init startMatrix, prevMatrix as digonal.
5. updatedMatrix = startMatrix * (1 - alpha) + (prevMatrix * adjacency matrix) * alpha, iterate this # times.

Even though Edwin built many features and build random forest for awesome model, I think in reality, vertex-vertex similarity computation will become too costly so just focus on scaling personalized pagerank up to large scale.

To test Map/Reduce`s perfomance, I used twitter-2010 dataset.
following is dataset`s stat.
nodes41 652 230
arcs1 468 365 182
bits/link13.897 (64.29%)
bits/link (transpose)13.148 (60.83%)
average degree35.253
maximum indegree770 155
maximum outdegree2 997 469
dangling nodes3.72%
buckets0.09%
largest component33 479 734 (80.38%)
spid0.17 (± 0.001)
average distance4.46 (± 0.002)
reachable pairs81.08% (± 0.257)
median distance5 (73.16%)
harmonic diameter5.29 (± 0.016)


Note that maximum degree can be over 3000000 which quickly become computational bottleneck. so I pruned vertices with more than 10000 degree which we actually doens`t care much since this vertices already have too much link.

The result was better than I expected. Map/Reduce runs less than 2 hours on 8cores, 16G, 10 datanode cluster for 3 iterations.

I pushed my code into my collaborative filtering experiment codebase as class PersonalizedPageRank.
I will port this code into Giraph and compare runtime.

Other than performance, I plan to evaluate personalized pagerank based link prediction by using PPR score as recommendations and actual training data as test set. think calculating MAP on topK can be used evaluation metric. I will update this metric in a few days.

RC File

I have been wondering how RC file actually works so I took a look at paper.
followings are simple review of paper.

In general, for map/reduce based data warehouse, following requirement needs to be addressed.


Requirement
1. fast loading: able to load large amount data fast.
2. fast query process: query execution should run fast.
3. efficient storage space utilization: 
4. strong adaptivity to highly dynamic workload patterns: 

To achieve above requirement, there have been 3 major data placement scheme.

1. row-store
cons
  • can`t provide fast query execution(full scan on all records, no skip on unnecessary columns)
  • can`t get high storage space utilization since it is hard to compress row efficiently
 pros
  • fast data load
  • strong adaptive ability to dynamo 
2. column-store
cons
  • tuple reconstruction cost lots => query processing slow down
  • record reconstruction overhead => since record reside on multiple location in cluster, network cost goes high.
  • require pre-knowledge of possible queries for creating column groups
pros
  • at query execution time, can avoid reading unnecessary columns.
  • high compression ratio
3. PAX store
cons

pros


What RC File provide
1. horizontally partition rows into multiple row groups
2. column-wise data compression within row group + lazy decompression during query execution
3. flexible row group size, trade off data compression performance query between execution performance
4. guarantees that data in the same row are located in the same node
5. column-wise data compression and skip unnecessary column reads

row group in RC File
1. sync marker: at beginning of the row group
2. metadata header: for row group: # records, each column bytes, each field in each column bytes.
3. table data section: column-store, columnA`s fields, columnB`s field….. sorted order
4. RLE for metadata header
5. each column is compressed independently using heavy-weight Gzip => lazy decompression



future work: automatically select best compression algorithm for each column according to it`s data type and data distribution

TODO: take a look at org.apache.hadoop.hive.ql.io.RCFile and others

Wednesday, November 7, 2012

Belief Propagation for music recommendations with Mapreudce And Giraph




I am taking Probabilistic Graphical Models course from Coursera. once class covers inference on markov random field using label propagation, I wanted to see how this algorithm works on real data so applied label propagation on million song dataset to feel it.

here is example of label propagation.

simple arithmetic result to following.

50% from my profile + 40% from X +10% from Z = 60% rock, 40% jazz.


Generalization


Here is actual formulation. here markov random field is represented by factor graph.



Above formulation can be implemented by matrix multiplication using Map/Reduce.

definition


Lets say graph structure as G, and set of Concept C = {c1, c2,...} where each ci is consist of set of vertices in G. ci represent prior per concept i. then we want to calculate pair-wise posterior given G, C.

implementation


iteration 0: CV(|C| x |V|) x G(|V| x |V|) = CV'(|C| x |V|)
iteration 1: CV'(|C| x |V|) x G(|V| x |V|) = CV''(|C| x |V|)
..
..

each iteration is simply matrix multiplication between Concept-Vertices CV matrix and static graph structure G.

This operation quickly becomes huge. so I first implemented this as DistributedRowMatrix in Mahout.

DistributedRowMatrix class provide following APIs.
1. tranpose: this.tranpose()
2. times(DistributedRowMatrix other): this.transpose().times(other)

using above API, label propagation in Map/Reduce becomes following.

1. init Concept-Vertices matrix CV.
2. normalize Graph for convenience if G is not normalized.
3. create CV, G using DistributedRowMatrix class.
4. for # iteration CV = CV x G

Following code demonstrate how DistributedRowMatrix in mahout library becomes handy.

1:    DistributedRowMatrix CV =   
2:    createInitialCV(numItems, getTempPath("initial.class"),   
3:                    getConf());  
4:    for (int i = startIteration; i < iterations; i++) {  
5:     log.info("current iteration: {}", iteration);  
6:     /*  
7:       GNorm is DistributedRowMatrix   
8:       contain vertex-vertex graph structure.  
9:       DistributedRowMatrix.times calculate  
10:      this.transpose().times(other). so transpose itself first.  
11:      */  
12:     CV = CV.transpose().times(GNorm)  
13:    }  


Note that using Map/Reduce for iterative job is inefficient, so why not try Giraph?

in Graph-Parallel environment, problems become following.

1. each vertex has it`s neighbor edges in G.
2. at superstep 0, some vertex has C vector as value([ci:prior, cj:prior...]). if vertex has C vector, then send C to all of it`s neighbors otherwise don`t send it.
3. after superstep 0, all vertex get messages([vertex_id j, C vector]) from each of it`s neighbors.
if current vertex is Vi, and message is [Vj, Cvj] then edge(Vi, Vj) / Vi`s all Edge sum * C is added to Vi`s value Cvi vector. merge all concept-prior vectors sent to each vertex and update value(Cvi).
4. if iteration is not done, send value(Cvi) to all neighbors.

Following code is compute method in VertexProgram to implement above.


@Override  
  public void compute(Iterable<MultiLabelVectorWritable> messages) throws IOException {  
   /*  
    * each vertex has Vector as value.   
    * this Vector consist of [concept_id:probability,....]  
    * MultiLabelVectorWritable to represent   
    * (vertex j which sent this message, vertex j`s value Vector)   
    */  
   long step = getSuperstep();  
   if (step < iteration) {  
    // we still need to compute on this vertex.  
    Vector currentVector = getValue().get();  
    // create new messages from this vertex.  
    MultiLabelVectorWritable newMessage = new MultiLabelVectorWritable();  
    // set message source to this vertex.  
    newMessage.setLabels(new int[]{(int)getId().get()});  
    // vertex value vector [concept_id:probability] is sparse.  
    Vector newMessageVector = new RandomAccessSparseVector(minNonConceptVertexId);  
    // iterate messages sent to this vertex and merge them up to build this vertex`s vector.  
    for (MultiLabelVectorWritable message : messages) {  
     int messageId = message.getLabels()[0];  
     Vector conceptProbs = message.getVector();  
     float weight = getEdgeValue(new LongWritable(messageId)).get();  
     Iterator<Vector.Element> probs = conceptProbs.iterateNonZero();  
     while (probs.hasNext()) {  
      Vector.Element prob = probs.next();  
      int conceptId = prob.index();  
      currentVector.set(conceptId, prob.get() * weight);  
     }  
    }  
    // prunning for absorb  
    Iterator<Vector.Element> iter = currentVector.iterateNonZero();  
    while (iter.hasNext()) {  
     Vector.Element e = iter.next();  
     if (e.get() < gamma) {  
      continue;  
     }  
     newMessageVector.setQuick(e.index(), e.get());  
    }  
    newMessage.setVector(newMessageVector);  
    sendMessageToAllEdges(newMessage);  
   } else {  
    voteToHalt();  
   }  
  }  


I implemented demo using label propagation with open dataset from million song dataset challenge from Kaggle. This demo load taste profile graph data into memory and calculate on the fly rather than using Giraph for demonstration. here is github for Giraph/Mahout code and demo codes.

TODO: since test set for this data is opend(competition is over), I will measure truncated mean average precision to evaluation label propagation.



Wednesday, October 17, 2012

movie recommendation demo with matrix factorization

I was experimenting with Graphlab and Mahout for Matrix Factorization these days.

Matrix factorization transform both items and users to the same latent factor space so they can be compared directly.

Even though Mahout and Graphlab is great tool for matrix factorization, these are designed for batch process. to get recommendations for new users who rate existing movies in rate matrix, following two steps are necessary.

1) transform user-rating vector to user-latent feature vector.
2) compare all movie-latent feature vectors with 1) and calculate scores.


this demo ask user to rate movies and do 1), 2) step.

most of work is just glue codes from Mahout with Jetty. 

check out this and feel free to give me any feedback.

Update: added label propagation to find serendipities. since the training data is small enough(1.7 million user, 40 K movies,  19 million edge), just load training data into memory.

Todo: I will update with evaluation metric(RMSE, MAP, Precision-Recall) after running batch jobs for this dataset using mahout/Graphlab for ALS, Giraph for label propagation.
also, add item-based cf as baseline to compare result

Monday, September 17, 2012

Mapreduce, BSP and Graphlab

개인적인 관심사가 machine learning인지라 평소에 주로 map/reduce를 사용한 mahout을 이용해 필요한 알고리즘들을 사용하곤 했다.  요사이 매달리고 있는 matrix factorization문제를 해결 하려고 시도하다가 알게 된 점 들을 몇자 적어본다. 

mahout을 너무너무 잘 쓰고는 있지만 몇가지 map/reduce model의 근본적인 문제를 계속 마주치게 되는데, 첫 째로 iterative job처리가 inefficient하다는 것이다. 

Iterative job을 map/reduce로 구현 할 때의 문제를 좀더 자세히 들여다보면 아래와 같다.



위 그림에서 눈여겨 볼 inefficient 한 곳은 두 가지 이다.

  1. 하나의 iteration내에서도 처리해야 할 데이터가 특정 CPU(computing resource)으로 몰려서 partition될 경우 다른 CPU들은 기다리는 lagging이 발생한다. 실제로 사용자 log graph들은 많은 수의 edge를 가지는 몇개의 vertex들을 가지는 경우가 흔하고, 이런 vertex들이 bottle neck 으로 작용한다.
  2. 매 iteration마다 발생하는 barrier부분에 disk IO와 startup cost가 있다. map only job이 # iteration만큼 hdfs에서 input을 읽고, 결과를 hdfs에 쓰는 overhead가 있다.
이를 그림으로 표현 하면 아래와 같다.

이러한 iterative한 job을 효율적으로 처리하기 위해 나온 것들이 구글의 pregel이고 이와 비슷한  framework로 apache Giraph, Hama가 있다(꼭 iterative job만을 위한건 아님, 뒤에 설명). 

이중에서 사용해 본 Giraph로 간략히 설명을 하면(자세한 설명), map only잡을 한번 submit하고, map only job에서 여러벌의 worker와 master를 생성한다. 이 master는 각각의  worker에 partition ownership을 부여하고, vertices를 partitioning한다. 
각각의 worker는 자신에게 속한 vertices들에대해 compute를 하고 barrier에서  sync한다(iteration횟수만큼 반복). 
map/reduce와 달리 hadoop의 startup penalty를 한번으로 줄이고, 각각의 superstep(iteration)의 결과들을 hdfs가아닌 memory에 들고 있어 다음 superstep에서 훨씬 빠른 access가 가능하다.



둘째로  interdependent computation(graph-parallel algorithm)은 not map-reducible하다.
조금더 일반화 하자면 많은 수의 machine learning algorithm들은 다음과 같은 property를 갖는다.

현재 vertex  X3의 computing을 위해서 X3의 neighbor들의 value들이 필요한 경우가 많은데 이는 mapper끼리 통신이 불가능한 map/reduce model에서는 처리 하기 힘들다.
label propagation algorithm(graph-parallel algorithm)을 예로 그 일반적인 특성을 보자.

실제로 belief propagation같은 algorithm은 mahout의 DistributedRowMatrix를 이용하여 matrix로 바꿔서 생각하면 구현 자체는 쉽게 할 수 있다. 하지만 matrix형태로 생각하는 게 intuitive하지도 않고, 위에서 말한 첫번째 이유에 의해서도 map/reduce말고 다른 computing model을 찾게 된다.

여기서 이 포스트의 주인공인 Graphlab이 등장하게 된다.


위에서와 같이 machine learning algorithm을 data-parallel과 graph-parallel로 구분 했을 때 Graphlab은 Graph-parallel algorithm에 특화된 framework이다.

살짝 중간 정리를 하자면, map/reduce가 key, value pair에 map/reduce라는 computation을 정의 하는 framework였다면 Graphlab은 graph상의 vertex node에 gatter, apply, scatter라는 computation을 정의 하는 framework이다.

이 framework에서는 각각의 vertex는 사용자가 정의 한 vertex-program을 통해 필요한 computing을 하는데 이때 neighbor vertex들의 상태값 뿐만아니라 다른 vertex의 상태 값들도 mpi를 통해 얻어 오는 api가 제공 된다.

다른 말로 위에서 말한 sparse data dependencies는 graph를 통해, local updates는 vertex-program를 통해 만족된다.



이는 사실 위에서 언급한 Pregel(Giraph, Hama)과 거의 흡사하다. 차이점이라면 pregel은 각각의 vertex-program이 message를 통해 interact하고, Graphlab은 각각의 program이 dependencies가 있는 서로의 state를 access할 수 있다는 점이다.
무슨 소리인가? 아래를 보자.


여기서 가장 중요한 점은 asynchronously이다. 위에서 설명 했던 BSP model와의 가장 큰 차이점은 바로 이 asynchronous한 execution이다. Graphlab은 이런 asynchronous update를 scheduling하면서 consistency를 보장해 주는데, 이를 위해 아래와 같이 scope rule이라는 개념을 가진다.






결론적으로 vertex consisitency가 가장 parallel하고, full consistency가 가장 non-parallel하다. 알고리즘 별로 어떤 scope rule이 필요한지 정의하면 framework이 race condition과 deadlock이 발생하지 않도록 보장해준다.

version1(shared memory)에서는 데이터가 커지면 memory도 커져야 했었지만, version 2부터는 distributed version으로 이런 제약이 없어졌고, HDFS를 input/output으로도 사용할 수 있게 되었다. 또 매우 다양한 algorithm들을 toolkit으로 제공하고 있어서 당장 사용해 볼 수 있다는 장점도 있다. 

자세한 내용은 http://graphlab.org/에서 꼭 살펴보길 바란다. 
http://videolectures.net/nipsworkshops2010_guestrin_kml/는 SELECT lab에서 NIPS2010에 발표한 video이다. 

reference:

TODO: 
  1. Graphlab과 mahout을 benchmark 해본 결과 정리
  2. MPI + Graphlab setup tutorial
  3. Giraph 자세한 설명 페이지 정리



Saturday, July 21, 2012

running median problem

I found this interesting problem from interview street.

problem was to maintain running median from stream of numbers.

more formally,  following is full problem from interview street.


The median of M numbers is defined as the middle number after sorting them in order, if M is odd or the average number of the middle 2 numbers (again after sorting) if M is even. You have an empty number list at first. Then you can add or remove some number from the list. For each add or remove operation, output the median of numbers in the list.

Example : For a set of m = 5 numbers, { 9, 2, 8, 4, 1 } the median is the third number in sorted set { 1, 2, 4, 8, 9 } which is 4. Similarly for set of m = 4, { 5, 2, 10, 4 }, the median is the average of second and the third element in the sorted set { 2, 4, 5, 10 } which is (4+5)/2 = 4.5  
Constraints:
0 < n <= 100,000

1. naive

If n is small enough, problem is simple.
each insert/delete sort entire elements
this require n * nlogn(n operation x cost for sorting).

2. using two multiset


Thing is only few elements are affected when insert/delete operation happen.
solution using two multiset s1, s2.

invariant

1) s1 is supposed to maintain 0 ~ n/2th order statistic.
2) s2 is supposed to maintain n/2 ~ nth order statistic.
3) s1.size()  and s2.size() only differ at most 1.

to hold invariant 1, 2
we compare x(current value to insert or delete) with maximum of s1 and minimum of s2 to decide where to put x.

to hold invariant 3
we need re-balance step
this can be done in constant time by
1) moving minimum of s2 to s1
2) moving maximum of s1 to s

Solution

once 3 invariant is meet, then calculating median as problem description is trivial.

1) s1.size == s2.size(s1.size + s2.size is even), then return (maximum of s1 + minimum of s2) / 2.0
one tip we should take care of integer overflow when we average two value(32bit integer) since adding two 32bit integer can cause integer overflow.
2) otherwise if s1.size > s2.size, return maximum of s1. s1.size < s2.size, return minimum of s2.

finally checking if we can delete x is trivial, since multiset provide find method in O(logN) complexity.

just good enough pass testcases.

3. using skip list

I was wondering what about more generic cases.
few google search gives me link to skip list.
for this problem, we exactly need indexable skip list since we keep need to extract median as input comes.
obviously this problem can be solved by indexable skip list.
here is my simple implementation.

TODO
want to take a look at how redis use skip list and understand why skip list is better than self-balance-tree in distributed environment.


check out code here