Crawler With Sleep(), Condition Variable & Semaphore

Screen Shot 2015-10-04 at 4.05.03 PM

Here we have multiple crawlers which infinitely checking if the task table has any task with status=new, if so we take one task then crawl the page. If the page is a list of the links to other page, we create a task for each link and save to task table. If the page is a real news page, we save it to page table. Eventually we read all the real pages from the page table.

1. Use sleep()

What we can do is if while check if there is any new task in the table, if not, we make the thread sleep for a certain time.

void crawler() {
    while (true) {
        //before reading, needs to lock the table
        lock(taskTable);//*
        //check if there is any new task in task table
        if (taskTable.find(state == 'new') == null) {
            //give up the lock if there is nothing to consume
            release(taskTable);//*
            sleep(1000);//*
        } else {
            task = taskTable.findOne(status == 'new');
            task.state = 'working';
            //release the lock;
            release(taskTable);
            //crawl by the task url
            page = crawl(task.url);
            //if page.type is a page including a list of links to the news
            if (task.type == 'list') {
                lock(taskTable);
                for (Task newTask : page) {
                    taskTable.add(newTask);
                }
                task.state = 'done';//*
                release(taskTable);
            }
            //if page.type is a news page,
            //add to page table
            else {
                lock(pageTable);
                pageTable.add(page);
                release(pageTable);
                //we still need to update the task by setting the status to be "done"
                lock(taskTable);
                task.state = 'done';//*
                release(taskTable);
            }
        }
    }
}

So the problem using sleep is:
1. 在每次轮询时,如果t1休眠的时间比较短,会导致cpu浪费很厉害;
2. 如果t1休眠的时间比较长,又会导致应用逻辑处理不够及时,致使应用程序性能下降。

2. Conditional Variable

我们利用条件变量(Condition Variable)来阻塞等待一个条件,或者唤醒等待这个条件的线程。

一个Condition Variable总是和一个Mutex搭配使用的。一个线程可以调用cond_wait()在一个Condition Variable上阻塞等待,这个函数做以下三步操作:
1. 释放Mutex
2. 阻塞等待
3. 当被唤醒时,重新获得Mutex并返回

注意:3个操作是原子性的操作,之所以一开始要释放Mutex,是因为需要让其他线程进入临界区去更改条件,或者也有其他线程需要进入临界区等待条件。

In this case, we put all the waiting crawler thread into cond.waitList if there is any.

cond_wait(cond, mutex):

cond_wait(cond, mutex) {
    lock(cond.waitList);
    cond.waitList.add(Thread.this);
    release(cond.waitList);

    release(mutex);
    sleep();
    lock(mutex);
}

How to use cond_wait()?

lock(mutex);
while (!condition) {
    cond_wait(cond, mutex);
}
change the condition
unlock(mutex);

cond_signal(mutex):

cond_signal(cond) {
    lock(cond);
    if (cond.waitList.size() > 0) {
        thread = cond.waitList.pop();
    }
    wakeup(thread);
    release(cond);
}

How to use cond_signal(mutex)?

lock(mutex);
set condition = true;
cond_signal(cond);
unlock(mutex);

So the crawler will become:

void crawler(){
	while(true){
		lock(taskTable);
		//it has to be while instead of if
		while(taskTable.find(state=='new')==null){
			cond_wait(cond, taskTable);//*
		}
		task = taskTable.findOne(state == 'new');
		task.state = 'working';
		release(taskTable);//*

		page = crawl(task.url);

		if(task.type=='list'){
			lock(taskTable);
			for(Task newTask:page){
				taskTable.add(newTask);
				cond_signal(cond); //*
			}
			task.state = 'done';
			release(taskTable);
		}else{
			lock(pageTable);
			pageTable.add(page);
			unlock(pageTable);

			lock(taskTable);
			task.state = 'done';
			unlock(taskTable);
		}
	}	
}

3. Semaphore

信号量一般是对整数资源进行锁。可以认为是一种特殊的条件变量。

wait(semaphore):

this means if semaphore is larger than 0, if not, it starts waiting until both happens:

1. semaphore.value>=0

2. someone wakes it up

wait(semaphore){
	lock(semaphore);
	semaphore.value--;
	while(semaphore.value<0){
		semaphore.processWaitList.add(this.process);
		release(semaphore);
		block(this.process);
		lock(semaphore);
	}
}

signal(semaphore):

signal(semaphore){
	lock(semaphore);
	semaphore.value++;
	if(semaphore.processWaitList.size()>0){
		process = semaphore.processWaitList.pop();
		wakeup(process);
	}
	release(semaphore);
}

So the crawler now becomes:

while(true){
	//we are not querying taskTable here so no need to lock taskTable
	wait(numOfNewTasks); //*
	lock(taskTable);
	task = taskTable.findOne(state =='new');
	task.state = 'working';
	release(taskTable);

	page = crawl(task.url);

	if(task.type == 'list'){
		lock(taskTable);
		for(Task newTask : page){
			taskTable.add(newTask);
			signal(numberOfNewTask); //*
		}
		task.state = 'done';
		release(taskTable);
	}else{
		lock(pageTable);
		pageTable.add(page);
		release(pageTable);

		lock(taskTable);
		task.state = 'done';
		release(taskTable);
	}
}

Ref: jiuzhang.com

FacebookTwitterGoogle+Share

System Design SNAKE原则 (以NetFlix为例)

  1. Scenario
  2. Necessary: constrain/hypothesis
    1. Daily active user? Ask! eg. 5,000,000
    2. Predict
      1. User
        1. Average Concurrent Users = daily_active_user * average_online_time / daily_seconds
          = 5,000,000 * (30*60) / (24*60*60)
          = 104,167/s
        2. Peak users = average_concurrent_users * 6 = 625,000/s
      2. Traffic
        1. Video traffic speed = 3mbps
        2. MAX
      3. Memory
        1. Memory per user = 10KB
        2. MAX daily memory = 5,000,000 * 2 * 10 = 100GB
          (T级以内的内存都是可以解的)
      4. Storage
        1. Total movie = 14,000
        2. Movie storage (视频会有不同版本) = total_movie * average_movie_size = 14,000*50GB = 700TB
  3. Application: service/algorithm 模块设计
  4. Kilobit: data 数据设计, 不同数据的存储模型
    1. 比如用户服务可以用mysql, 查询逻辑强
    2. 电影文件就用文件存,不用数据库
  5. Evolve: 和面试官沟通
    1. Step1: Analyze
      1. with
        1. More constrains
        2. New use cases
        3. Deeper, more details
      2. from the views of
        1. Performance
        2. Scalability
        3. Robustness
    2. According to 面试官, 加深某一部分的设计

 

Producer & Consumer with Java wait(), notify() and notifyAll()

Ref:
Java Thread: notify() and wait() examples
How to Work With wait(), notify() and notifyAll() in Java?

1. Keywords & Methods:

  • Synchronized
    • synchronized keyword is used for exclusive accessing.
    • To make a method synchronized, simply add the synchronized keyword to its declaration. Then no two invocations of synchronized methods on the same object can interleave with each other.
  • wait()
    • Tells the calling thread to give up the monitor and go to sleep until some other thread enters the same monitor and calls notify().
      General syntax for calling wait() method is like this:

      synchronized (lockObject){
              while (!condition) {
                  lockObject.wait();
              }
              //take the action here;
      }
  • notify()
    • Wakes up the first thread that called wait() on the same object. It should be noted that calling notify() does not actually give up a lock on a resource. It tells a waiting thread that that thread can wake up. However, the lock is not actually given up until the notifier’s synchronized block has completed. So, if a notifier calls notify() on a resource but the notifier still needs to perform 10 seconds of actions on the resource within its synchronized block, the thread that had been waiting will need to wait at least another additional 10 seconds for the notifier to release the lock on the object, even though notify() had been called.
      synchronized(lockObject)
      
          {
              //establish_the_condition;
      
              lockObject.notify();
      
              //any additional code if needed
          }//lock is given up after synchronized block is ended
  • notifyAll()
    • Wakes up all threads that are waiting on this object’s monitor. The highest priority thread will run first in most of the situation, though not guaranteed. Other things are same as notify() method above.
      synchronized(lockObject)
      {
          establish_the_condition;
       
          lockObject.notifyAll();
      }

2. Example

Producer and Consumer:

1) Producer thread produce a new resource in every 1 second and put it in ‘taskQueue’.
2) Consumer thread takes 1 seconds to process consumed resource from ‘taskQueue’.
3) Max capacity of taskQueue is 5 i.e. maximum 5 resources can exist inside ‘taskQueue’ at any given time.
4) Both threads run infinitely.

Producer:

public class Producer implements Runnable {
    private List<Integer> taskQueue;
    private int MAX_CAPACITY;
    //instead of having an int counter,
    //we use a wrapper object, so that it can be passed by reference
    private Counter counter;

    public Producer(List<Integer> taskQueue, int max_capacity, Counter counter) {
        this.taskQueue = taskQueue;
        this.MAX_CAPACITY = max_capacity;
        this.counter = counter;
    }

    @Override
    public void run() {
        //infinite loop so that producer keeps producing elements at regular interval.
        while (true) {
            synchronized (taskQueue) {
                try {
                    //here it has to be while instead of if since if after it is waken up,
                    //someone else takes the lock and change the taskQueue size
                    //if check will still go through and start producing
                    while (taskQueue.size() == MAX_CAPACITY) {
                        System.out.println(Thread.currentThread().getName() +
                                ": Queue(size=" + taskQueue.size() + ")is full, now start waiting...");
                        taskQueue.wait();
                    }
                    //simulating time delays in consuming elements.
                    Thread.sleep(100);
                    taskQueue.add(counter.val);
                    System.out.println(Thread.currentThread().getName() + ": Produced:" + counter.val);
                    counter.increase();
                    //Calling notifyAll() because the last-time wait() method was called by consumer thread
                    //(that’s why producer is out of waiting state), consumer gets the notification.
                    taskQueue.notifyAll();
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }
            //do something else
        }
    }
}

1) Here “produce(counter++)” code has been written inside infinite loop so that producer keeps producing elements at regular interval.
2) We have written the produce() method code following the general guideline to write wait() method as mentioned in first section.
3) Once the wait() is over, producer add an element in taskQueue and called notifyAll() method. Because the last-time wait() method was called by consumer thread (that’s why producer is out of waiting state), consumer gets the notification.
4) Consumer thread after getting notification, if ready to consume the element as per written logic.
5) Note that both threads use sleep() methods as well for simulating time delays in creating and consuming elements.

Consumer:

public class Consumer implements Runnable {
    private List<Integer> taskQueue;

    public Consumer(List<Integer> taskQueue) {
        this.taskQueue = taskQueue;
    }

    @Override
    public void run() {
        //infinite loop so that consumer keeps consuming elements whenever it finds something in taskQueue..
        while (true) {
            synchronized (taskQueue) {
                try {
                    while (taskQueue.size() == 0) {
                        System.out.println(Thread.currentThread().getName() +
                                ": Queue(size=" + taskQueue.size() + ")is empty, now start waiting...");
                        taskQueue.wait();
                    }
                    //simulating time delays in consuming elements.
                    Thread.sleep(100);
                    System.out.println(Thread.currentThread().getName() + ": consuming " + taskQueue.remove(0));
                    //Once the wait() is over, consumer removes an element in taskQueue and called notifyAll() method.
                    //Because the last-time wait() method was called by producer thread
                    //(that’s why producer is in waiting state), producer gets the notification.
                    taskQueue.notifyAll();
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }
            //do something else
        }
    }
}

1) Here “consume()” code has been written inside infinite loop so that consumer keeps consuming elements whenever it finds something in taskQueue..
2) Once the wait() is over, consumer removes an element in taskQueue and called notifyAll() method. Because the last-time wait() method was called by producer thread (that’s why producer is in waiting state), producer gets the notification.
3) Producer thread after getting notification, if ready to produce the element as per written logic.

Counter:

Noted that we need a counter as an id for each task. Since we might need more than one producer, instead of passing int or Integer(both of them are immutable), we need to have a wrapper object so that we can pass by reference to be shared between multiple producers.

public class Counter {
    int val;
    public Counter(){
        val = 0;
    }
    public void increase(){
        val++;
    }
    public void decrease(){
        val--;
    }
}

Test:

public class ProducerConsumer {
    public static void main(String[] args) {
        Counter c = new Counter();
        List<Integer> taskQueue = new ArrayList<>();
        Thread tProducer1 = new Thread(new Producer(taskQueue, 5, c), "Producer1");
        Thread tProducer2 = new Thread(new Producer(taskQueue, 5, c), "Producer2");
        Thread tConsumer1 = new Thread(new Consumer(taskQueue), "Consumer1");
        Thread tConsumer2 = new Thread(new Consumer(taskQueue), "Consumer2");
        tProducer1.start();
        tProducer2.start();
        tConsumer1.start();
        tConsumer2.start();
    }
}

Output:

Producer1: Produced:0
Producer1: Produced:1
Consumer2: consuming 0
Consumer2: consuming 1
Consumer1: Queue(size=0)is empty, now start waiting…
Producer2: Produced:2
Consumer1: consuming 2
Consumer2: Queue(size=0)is empty, now start waiting…
Producer1: Produced:3
Consumer2: consuming 3
Consumer1: Queue(size=0)is empty, now start waiting…
Consumer2: Queue(size=0)is empty, now start waiting…
Producer2: Produced:4
Consumer2: consuming 4
Consumer2: Queue(size=0)is empty, now start waiting…
Consumer1: Queue(size=0)is empty, now start waiting…
Producer1: Produced:5
Producer1: Produced:6
Producer1: Produced:7
Producer1: Produced:8
Producer1: Produced:9
Producer1: Queue(size=5)is full, now start waiting…
Consumer1: consuming 5
Consumer2: consuming 6
Consumer2: consuming 7
Consumer2: consuming 8
Consumer2: consuming 9
Consumer2: Queue(size=0)is empty, now start waiting…

 

Design A Typeahead

Reference:

Facebook Typeahead

Screen Shot 2015-09-30 at 10.30.58 PM

1. Preload 1st Degree Data into Browser Cache

Screen Shot 2015-09-30 at 10.41.43 PM

Once Alice clicks the search box, it sends off a request(basically calling first-degree.php in this case) to retrieve all of the user’s direct friends, pages, groups, applications, and upcoming events. Then save it in the browser cache. So that it can immediately show the results without sending another request. 

2. AJAX request and Load Balancer

Screen Shot 2015-09-30 at 10.46.25 PM

Now Alice types ‘B’, it should first show Bob since it is in the browser cache. Then it fires an ajax request (typeahead.php in this case), the load balancer is responsible for routing the request to different servers. Typically each server only handles one specific category of results(friend-of-friend, object-of-friend, events, etc). 

Those blue rectangles are services which could be applied on multiple machines. The global service is for something which are independent to querying user. Like the most popular game or event, since ther we can save latency by storing recent results in a memcached-based query cache.

3. Aggregator

Aggregator delegates queries to multiple lower-level services in parallel and integrating their results.

4. Fetching Data and Validating Results

The results returned by the aggregator are simply a list of ids. The web tier needs to fetch all the data from memcache/MySQL to render the results and display information like the name, profile picture, link, shared networks, mutual friends, etc. The web tier also needs to do privacy checking here to make sure that the searcher is allowed to see each result.

5. Displaying the Results

The results with all the relevant data are sent back to the browser to be displayed in the typeahead. These results are also added to the browser cache along with the bootstrapped connections so that similar subsequent queries don’t need to hit the backend again.

Typeahead Algorithm

 

 

Elevator Design

How would I design the elevators for a new 40 story office

building that had an average of 100 people per floor to most efficiently fill and empty the building given a standard 9-5 workday and traffic conditions in my city? The answer needed to be completely detailed, including expected passengers per car, time per stop, average floors stops per trip at various hours, etc.

1. Ask

  1. How many elevators are there?
  2. What is the capacity of each elevator?
  3. Is the efficiency goal focused only at the start & end of day & not in between (i.e. lunch time, breaks)?

2. General optimal critiera

  • provide even service to each floor
  • minimize how long passengers wait for an elevator to arrive
  • minimize how long passengers spend to get to their destination floor
  • serve as many passengers as possible

The first advice that I’ve read is to ask some questions before you start answering. It will show that you are strategic & don’t jump to random assumptions. So I will probably ask questions like: Is the efficiency goal focused only at the start & end of day & not in between (i.e. lunch time, breaks)? How many elevators are there? What is the capacity of each elevator?

2) Assuming that everything is average, i.e. 6 elevators, 15 people per elevator, and focus only on start and end date, then the sample data should follow a normal distribution.

730-8 – 2%

8-830 – 14%

830-9 – 34%

9-930 – 34%

930-10 – 14%

10-1030 – 2%

3) I will break this down & solve the worst case scenario first. This means, 34 people x 40 floors = 1360 people to be transported by 6 elevator x 15 = total 90 capacity during 830-9 or 9-930 am.

4) Focusing on this more manageable problem, 1360 / 90 means each elevator will make 15 full cycles (lobby to highest floor and back) 5) Since we want to minimize the cycle time for each elevator, we assign one elevator per subset of 40/6 consecutive floors. This should address the issue on minimizing time per stop. 6) That means, the final design should be a load balancing of the elevators by minimizing the travel time — Elevator A – 1st to 7th floor, B – 8th to 14th floor, and so forth. Do you guys see anything wrong with this line of thinking?

先说说我对单个电梯设计的想法(欢迎批评指正)

1 Elevator Object, 应该包含physical components: Door, Indicator Lights,
Control Panel. 一些性质(Non physical properties): Speed, Num of floors,
capacity, max weight. 所能从事的操作methods: moveto, stop, ringbell。然后电
梯应该能够handle user request, 所以还应有一个requestQueue, 电梯应该根据自己
的state 和 requestQueue做出moveto, stop的决定,所以有一component:
requestHandler(Strategy pattern),可以set不同的requestHanlder.

2 Door, properties: State, method: open, close, getState.

3 Indicator light(指示所到楼层),properties: state; method: on, off,
getState

4 Control Panel, 包含physical component: Floor Buttons, Other buttons(也可直
接把Buttons 当作 elevator的components,还没考虑哪一个方法好)

5 Button, properties: floorNum, Parent Elevator, methods: OnPress(Observer
Pattern).

6 ElevatorRequestHandler: handleRequest(Elevator ele, requestList rlist), 可
以define 一个interface, 然后又各种不同实现

7 Request: 可以define 一个abstract class, 然后有子类movingRequest,
helpRequest doorRequest etc.

A Single Elevator

Use Case:

  1. User
    1. press a button to summon the lift
    2. press a button to get to a specific floor
  2. Button
    1. floor button and level button
    2. illuminates when pressed
    3. place an ‘elevator request’ when pressed
  3. Elevator
    1. moves up/down
    2. open/close the door

ElevatorRequests Class

Each button press results in an elevator request which has to be served. Each of these requests is tracked at a global place. ElevatorRequests, the class which stores elevator requests can use different strategies to schedule the elevator requests.

ElevatorController

The elevator is controlled by a controller class which we call ElevatorController. The elevator controller instructs the elevator what to do and also can shutdown/start up the elevator of the building. The elevator controller reads the next elevator request to be processed and serves it.

Button (Abstract) Class

Button is abstract class defining common behavior like illuminate, doNotIlluminate. FloorButton, ElevatorButton extend Button type and define placeRequest() which is invoked when the button is pressed.

In conclusion, ElevatorController runs the show by reading the ElevatorRequests to process and instructing the Elevator what to do. User send request by pressing Buttons.

Extend the answer to multiple elevators

  1. Each elevator have 1 controller.
  2. Floor based requests can be served by any elevator, thus these requests are added to a common area accessible by all controllers.
  3. Each elevator controller runs as a separate thread and checks if it can process a floor request. Mind synchronization issues.

http://www.columbia.edu/~cs2035/courses/ieor4405.S13/p14.pdf

https://github.com/joeblau/sample-elevator-control-system

http://blog.jobbole.com/74672/

http://blog.gssxgss.me/elevator-simulator-java/

Dynamo: Amazon’s Highly Available Key-value Store 论文笔记

Dynamo: Amazon’s Highly Available Key-value Store

  1. System Assumptions and Requirements in this case

    1. High write availability (this is based on their use cases like shopping carts, user should be able to update the shopping carts anytime). So the design is also writable and resolve conflicts when read.
    2. Query model is simple read and write operations to a data item which is uniquely identified by unique keys. No need for relational schemas. (Which is also based on the observation of some Amazon’s services.)
    3. ACID(Atomicity, Consistency, Isolation, Durability) are not strictly followed since it targets applications that tolerant weaker consistency, which is called eventually consistency.
  2. Design Considerations

    1. When to resolve update conflicts? Read or Write?
      1. Since it focus on high write availability, so it pushes conflict resolution to reads (which unlike many traditional DBs which execute conflict resolution during writes and has simple policy for reads)
    2. Who to resolve the conflicts? The data store or application?
      1. The application is responsible to resolve conflict updates. Since data store only has simple police like “last write wins” to resolve conflicts while application has more knowledge of each different situations and could have different strategy to resolve conflicts.
    3. Incremental scalability
      1. Add/Delete one node at a time without having a huge impact on both read/writes of the system.
    4. Symmetry
      1. No outstanding nodes. Each node should have the same responsibilities as its peers.
  3. Architecture

    Problem

    Technique

    Advantage

    Partitioning

    Consistent Hashing

    Incremental Scalability

    High Availability for writes

    Vector clocks with reconciliation during reads

    Version size is decoupled from update rates.

    Handling temporary failures

    Sloppy Quorum and hinted handoff

    Provides high availability and durability guarantee when some of the replicas are not available.

    Recovering from permanent failures

    Anti-entropy using Merkle trees

    Synchronizes divergent replicas in the background.

    Membership and failure detection

    Gossip-based membership protocol and failure detection.

    Preserves symmetry and avoids having a centralized registry for storing membership and node liveness information.

    sosp-figure2-small

    1. Partitioning (Consistent Hashing)
      1. Both node and key are mapped to the same hash space (eg. 00~FF)
      2. Key K is stored in B, which means B is responsible for K
      3. Pros:
        1. Load balance (each node would get roughly similar number of keys)
        2. Scalability (add/delete one nodes, only its neighbors would be affected)
    2. Replication  
      1. Dynamo is setup, N is assigned as a parameter indicating each data item is replicated on N nodes.
      2. Each key contains a list of nodes which is responsible for its read/write operation. Which is called Preference List. Length of the preference list should be larger than N just in case nodes failures.
      3. Using the consistent hashing, each node finds its coordinator, who is responsible to replicate the data to N-1 clockwise successor nodes.
    3. Versioningsosp-figure3-small
      1. Vector Clock is used to show if there are update conflicts. Mainly used in key-value storage which doesn’t have locks for writes to pursue better performance.
      2. D5([Sx, 3],[Sy, 1],[Sz,1]) means data item 5 which was updated by Sx 3 times, Sy 1 time, Sz 1time. Using the vector, it is easily to find out if two different version are parallel.
      3. When reads the data, the vector clock is also included in the data item.
      4. Deep understanding and examples, please check here
      5. Cons: Vector Clock some times could be too long if there are many different servers involved in writes. But in real cases it should not happen since writes are generally handled by top N nodes in the preference list of that key. Even if it happens, we can have a upper bound size of the vector clock and get rid of the old vectors depending on the timestamp, which might potentially cause problems when trying to resolve conflicts.
    4. Get() & Put() operation
      1. Only first N healthy nodes in the preference list are involved. (those are down and inaccessible are skipped)
      2. W + R > N (W/R: number of nodes which should success for writes/reads)
      3. When put(), the coordinator generates the vector clock with the new version and writes the new version locally. Then replicates the new version to first N reachable node in the preference list. Consider write successful as long as there is W-1 nodes respond.
      4. Similarly, for get(), the coordinates request the data from first N reachable nodes from the preference list and as long as there are R-1 response it will then returns all version of the data.
    5. Failure handling (Hinted Handoff)
      1. Check the Dynamo ring above, if node A is down, the data item which is supposed to written to A is now written to D (suppose N=3) along with the metadata (indicating which node it is supposed to be at) which is stored separately in D
      2. Once such hint is discovered, and A is recovered, D will send the replica to A and then delete the replica from itself.
      3. Hinted Handoff ensures read/writes won’t be rejected due to single node down or network failure.
    6. Recovering from permanent failures、Membership and failure detection待进一步整理。

Reference: http://blog.ddup.us/2011/11/07/amazon-dynamo/