MaheshT.com

June 5, 2011

Executing Quicksort in parallel using Java threads

Filed under: Java,Programming — MaheshT @ 11:56 am

Running quick sort in parallel is easier in java compare to merge sort because each task in quicksort is independent of each other. (In merge sort, you need to wait for subtasks to be finished before the merge is done)
In the merge task, we keep on dividing the array until it is above the threshold and creating subtask to sort it. All subtasks are submitted to ThreadPool for execution.

Code for classes to execute quick sort in parallel

SortTask interface: This interface has only a method that indicates if the task is ready to execute by a thread in the thread pool. Note that the task for quicksort will be always ready as there is no dependency.

public abstract class SortTask implements Runnable{

  public abstract boolean isReadyToProcess();

}

QuicksortTask class: This is the actual task which a thread in a thread pool executes. The root task of the quick sort divided into many such tasks based on the number of elements in the array that needs to be sorted. If the number of elements under consideration above SPLIT_THRESHOLD then it creates two subtasks otherwise sort those elements in the same task. Here we defined SPLIT_THRESHOLD to 100000 used for sorting a million elements.

import java.util.List;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Future;

public class QuickSortTask extends SortTask {
  private final int SPLIT_THRESHOLD = 100000;
  private int sortArray [];
  private int iStart = 0;
  private int iEnd  =0;
  ExecutorService threadPool;
  List futureList;

  public QuickSortTask(ExecutorService threadPool, List futureList, int[] inList,int start, int end){
    this.sortArray = inList;
    this.iStart = start;
    this.iEnd   = end;
    this.threadPool =threadPool;
    this.futureList =futureList;
  }

  @Override
  public boolean isReadyToProcess() {
    return true;
  }

  public void run() {
    sort(sortArray, iStart, iEnd) ;
  }

  private   void sort(final int inList[], int start, int end) {
    int pivot = inList[start]; // consider this as  hole at inList[start],
    int leftPointer = start;
    int rightPointer = end;
    final int LEFT = 1;
    final int RIGHT = -1;
    int pointerSide = RIGHT; //  we start with right as pivot is from left

    while (leftPointer != rightPointer) {
      if (pointerSide == RIGHT) {
        if (inList[rightPointer] < pivot) {           inList[leftPointer] = inList[rightPointer];           leftPointer++;           pointerSide = LEFT;         } else {           rightPointer--;         }       } else if (pointerSide == LEFT) {         if (inList[leftPointer] > pivot) {
          inList[rightPointer] = inList[leftPointer];
          rightPointer--;
          pointerSide = RIGHT;
        } else {
          leftPointer++;
        }
      }
    }

    //put the pivot where leftPointer and rightPointer collide i.e. leftPointer == rightPointer
    inList[leftPointer]=pivot;

    if((leftPointer - start) > 1){
      if ((leftPointer - start) > SPLIT_THRESHOLD){
        futureList.add(threadPool.submit(new QuickSortTask(threadPool,futureList,inList, start, leftPointer-1)));
      }else {
        sort(inList, start, leftPointer-1);
      }
    }

    if((end - leftPointer) > 1){
      if ((end - leftPointer) > SPLIT_THRESHOLD ){
        futureList.add(threadPool.submit(new QuickSortTask(threadPool,futureList,inList, leftPointer+1, end)));
      }  else {
        sort(inList, leftPointer+1, end);
      }
    }

  }

}

QuickSortInParallel class: Main class to put everything together. Here we create an instance of ThreadPoolExecutor with 10 threads and create a root task to sort a million random elements. Also, a list to hold Futures returned by submit method of ExecutorService. The get method of Future wait until the task is completed. We remove the Feature from the list when the corresponding task is done.
Empty Features List means all tasks are done and the list is sorted.

import java.util.ArrayList;
import java.util.List;
import java.util.Random;
import java.util.Vector;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;

public class QuickSortInParallel {

  public static void main(String [] args){
    final int  NO_OF_ELEMENT = 10000000;
    Random random = new Random();

    int [] sortArray = new int [NO_OF_ELEMENT ];
    for (int i=0; i< NO_OF_ELEMENT ;i++){
      sortArray[i] = random.nextInt(NO_OF_ELEMENT );
    }

    final ExecutorService executor = Executors.newFixedThreadPool(10);
    List futures = new Vector();
    QuickSortTask rootTask = new QuickSortTask (executor,futures,sortArray,0,sortArray.length-1)  ;
    long startTime = System.currentTimeMillis();
    futures.add(executor.submit(rootTask));
    while(!futures.isEmpty()){
 //     System.out.println("Future size " +futures.size());
      Future topFeature = futures.remove(0);
      try{
        if(topFeature!=null)topFeature.get();
      }catch(InterruptedException ie){
        ie.printStackTrace();
      }catch(ExecutionException ie){
        ie.printStackTrace();
      }
    }
    System.out.println("Time taken " + (System.currentTimeMillis() - startTime));
     for (int i=sortArray.length-1000; i<sortArray.length; i++){
       System.out.println(" element at " + i + " : " + sortArray[i]  );
    }
    executor.shutdown();
  }

}

June 4, 2011

Executing Mergesort in parallel using Java threads

Filed under: Java,Programming — MaheshT @ 4:18 pm

Merge sort is done by dividing the array recursively until there are one or two elements, sort it, and merge it back.
To run merge sort in parallel, we will divide a big array into multiple subtasks.
Here is the logic we will use.
(I) If the size of the array under consideration is bigger than the threshold then divide it and create two sub-tasks of sorting and submit those to the thread pool for execution.
(II) If subtasks of sorting divided array are done, then mere it back.
(III) If the size of the array under consideration is smaller than the threshold then sort it using mergesort in the same task.

Code for classes used in doing mergesort in parallel.

The interface implemented by each subtask

public abstract class SortTask implements Runnable{

  public abstract boolean isReadyToProcess();

}

MergeSortTask class which implements SortTask interface


import java.util.concurrent.atomic.AtomicBoolean;

/**
 * @author MaheshT
 *
 */
public class MergeSortTask extends SortTask{
  private int sortArray [];
  private int iStart = 0;
  private int iEnd  =0;
  private  int noOfSubtask =0;
  private final int SPLIT_THRESHOLD = 100000;
  private int taskDone =0;
  private MergeSortTask parentTask = null;
  private SortingThreadPool<SortTask> threadPool = null;
  private AtomicBoolean  splitDone =new AtomicBoolean(false);
  private Object waitForTaskDone = new Object();

  public MergeSortTask(SortingThreadPool<SortTask> threadPool,MergeSortTask parentTask,int [] inList,int start, int end){
    this.sortArray = inList;
    this.iStart = start;
    this.iEnd   = end;
    this.threadPool = threadPool;
    this.parentTask = parentTask;

  }

  private void splitSortNMerge(){
    splitSortNMerge(sortArray, iStart,iEnd);

  }

  private void splitSortNMerge(int inList[],int start, int end){
    int diff = end - start;
    int mid  = 0;
    int startIdxFirstArray =0;
    int endIdxFirstArray =0;
    int startIdxSecondArray=0;
    int endIdxSecondArray=0;

    if (diff > 1){ // more than two then split
      mid  = diff/2;
      startIdxFirstArray = start;
      endIdxFirstArray = start + mid;
      startIdxSecondArray= start + mid +1;
      endIdxSecondArray= end;
      if (diff > SPLIT_THRESHOLD){
        submitTaskToPool(new MergeSortTask(threadPool,this,sortArray,startIdxFirstArray, endIdxFirstArray));
        submitTaskToPool(new MergeSortTask(threadPool,this,sortArray,startIdxSecondArray, endIdxSecondArray));
        submitTaskToPool(this);
        noOfSubtask=2;
        //merge will be done by calling merge() function, when above tasks are done
        splitDone.set(true);
        return;
      }else {
        //recursive split and merge
        splitSortNMerge (inList,startIdxFirstArray, endIdxFirstArray);
        splitSortNMerge (inList,startIdxSecondArray, endIdxSecondArray);
        merge(inList,startIdxFirstArray,endIdxFirstArray,startIdxSecondArray,  endIdxSecondArray);
      }
    }else if (diff == 1){ // two element
      if (inList[start] > inList[end]){ //swap
        int tempVar = inList[start];
        inList[start] = inList[end];
        inList[end]   = tempVar;
      }
    }
  }

  private void merge(){

    int startIdxFirstArray =0;
    int endIdxFirstArray =0;
    int startIdxSecondArray=0;
    int endIdxSecondArray=0;

    int diff = iEnd - iStart;
    int mid  = 0;

    mid  = diff/2;
    startIdxFirstArray = iStart;
    endIdxFirstArray = iStart + mid;
    startIdxSecondArray=iStart + mid +1;
    endIdxSecondArray= iEnd;
    merge(sortArray,startIdxFirstArray,endIdxFirstArray,startIdxSecondArray,endIdxSecondArray);

  }

  private static void merge(int [] inList,int startIdxFirstArray,int endIdxFirstArray,
    int startIdxSecondArray, int endIdxSecondArray){

    int firstArryPtr = startIdxFirstArray;
    int secondArryPtr = startIdxSecondArray;

    int  []  tempArry = new int [endIdxSecondArray-startIdxFirstArray +1 ] ;
    //merge in sorted order
     int tempIdx =0;

     for (tempIdx=0;tempIdx < tempArry.length ; tempIdx++){
       if ( inList[firstArryPtr] < inList[secondArryPtr]){
         tempArry[tempIdx] = inList[firstArryPtr];
         firstArryPtr++;
         if (firstArryPtr > endIdxFirstArray) break;
       }else {
         tempArry[tempIdx] = inList[secondArryPtr];
         secondArryPtr++;
         if (secondArryPtr > endIdxSecondArray) break;
       }
     }
     if (firstArryPtr > endIdxFirstArray){
       while (tempIdx < tempArry.length-1){
         tempIdx++;
         tempArry[tempIdx]= inList[secondArryPtr];
         secondArryPtr++;
       }
     }else if (secondArryPtr > endIdxSecondArray){
       while (tempIdx < tempArry.length-1){
          tempIdx++;
         tempArry[tempIdx]= inList[firstArryPtr];
         firstArryPtr++;
       }
     }

     //copy sorted array
     for (int j=0;j < tempArry.length ; j++){
       inList[startIdxFirstArray+j] = tempArry[j];
     }
  }

  public boolean isReadyToProcess(){
    if (!splitDone.get()){
      return true;
    }else if (taskDone==noOfSubtask){
      return true;
    }else {
      return false;
    }
  }

  public synchronized void subTaskDone(){//
    taskDone++;
  }

  private void submitTaskToPool(MergeSortTask sortTask){
    threadPool.addTask(sortTask);
  }

  public void run(){
    if (splitDone.get()){
      this.merge();
    }else {
      this.splitSortNMerge();
     }

    if (taskDone==noOfSubtask) {
      if (parentTask !=null){
        this.parentTask.subTaskDone();
      }
      synchronized(waitForTaskDone){
        waitForTaskDone.notifyAll();
      }
    }
  }

  public  int [] get() throws InterruptedException {
    if (splitDone.get() && taskDone==noOfSubtask ){
      return sortArray;
    }else {
      synchronized(waitForTaskDone){
        waitForTaskDone.wait();
      }
    }
    return sortArray;
  }

  public String toString() {
    return "Task to sort from " + this.iStart +" To "+this.iEnd ;
  }
}

Thread pool which will execute each task when it ready to process.

import java.util.ArrayList;
import java.util.List;

public class SortingThreadPool<E extends SortTask > {

  List<E> taskList = new ArrayList<E>();
  List<WorkerThread> workers = new ArrayList<WorkerThread>();
  boolean shutdown = false;

  public SortingThreadPool(int noOfThreads){
    //add worker to connection pool
    for (int i=0; i <noOfThreads;i++){
      WorkerThread workerThread =new WorkerThread("Worker: "+i );
      workers.add(workerThread);
      workerThread.start();
    }
  }

  public void addTask(E e){
    synchronized(taskList){
      taskList.add(e);
      //System.out.println("Added task " + e);
      //done adding all tasks, notify all waiting threads.
      taskList.notifyAll();
    }

  }
  public void shutDown(){
    shutdown = true;
    synchronized(taskList){
      taskList.notifyAll();
    }
  }

  private class WorkerThread extends Thread {
    public WorkerThread(String name){
      super(name);
    }

    public void run(){
      System.out.println("Starting thread ");
      while (!shutdown || !taskList.isEmpty() ){
        //System.out.println("Reading task 2");
        try{
          synchronized(taskList){
           // System.out.println("Reading task");
            E tempTask= null;
            for (E myTask : taskList){
             // System.out.println( "Checking " +myTask);
              if ( myTask.isReadyToProcess()){
                tempTask =  myTask;
              }
            }
            if (tempTask !=null){
              //System.out.println( this.getName() + " working on " + tempTask);
              taskList.remove(tempTask);
              tempTask.run();
            }else {
              if ( !shutdown || !taskList.isEmpty()){
                System.out.println("Going to wait " + this.getName() + " size " + taskList.size());
                taskList.wait();
              }
            }
            //give other treads chance to work on tasks
            Thread.yield();
          }
        }catch(Exception e){
          e.printStackTrace();
        }
      }
      System.out.println("Exiting thread " + this.getName());
    }
  }

}

Finally, the main class will create an instance of a thread pool and submit a task of the sort a million random numbers.

import java.util.Random;
import java.util.concurrent.Callable;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.concurrent.FutureTask;

public class MergeSortInParallel {

  public static void main (String [] args){
    final int  NO_OF_ELEMENT = 10000000;
    Random random = new Random();

    int [] sortArray = new int [NO_OF_ELEMENT ];
    for (int i=0; i< NO_OF_ELEMENT ;i++){
      sortArray[i] = random.nextInt(NO_OF_ELEMENT );
    } 

    //int []sortArray = {16,15,14,13,12,11,10,9,8,7,6,5,4,3,2,1};
    //int []sortArray = {10,9,8,7,6,5,4,3,2,1};
    SortingThreadPool<SortTask> threadPool = new SortingThreadPool<SortTask>(10);
    MergeSortTask sortTask = new MergeSortTask(threadPool,null,sortArray,0,sortArray.length-1);
    long startTime = System.currentTimeMillis();
    threadPool.addTask(sortTask);

    try{
      sortTask.get();
    }catch(InterruptedException ie){
      ie.printStackTrace();
    }
    System.out.println("Time taken " + (System.currentTimeMillis() - startTime));   

    threadPool.shutDown();

   // for (int i=0; i<sortArray.length; i++){
   //   System.out.println(" element at " + i + " : " + sortArray[i]  );
   // }

  }

}

June 1, 2011

Sorting : Merge sort

Filed under: Java,Programming — MaheshT @ 8:47 pm

Performance

  • Best: O(n log n)
  • Average: O(n log n)
  • Worst:O(n Log n)

Steps in mergesort

  • Recursively Keep on splitting the list until each list has 1 or 2 elements.
  • If the list has one element it is already sorted.
  • If the list has two elements then sort it.
  • Combine lists in sorted order in an upward direction.

Powered by WordPress