MaheshT.com

June 5, 2011

Executing Quicksort in parallel using Java threads

Filed under: Java,Programming — MaheshT @ 11:56 am

Running quick sort in parallel is easier in java compare to merge sort because each task in quicksort is independent of each other. (In merge sort, you need to wait for subtasks to be finished before the merge is done)
In the merge task, we keep on dividing the array until it is above the threshold and creating subtask to sort it. All subtasks are submitted to ThreadPool for execution.

Code for classes to execute quick sort in parallel

SortTask interface: This interface has only a method that indicates if the task is ready to execute by a thread in the thread pool. Note that the task for quicksort will be always ready as there is no dependency.

public abstract class SortTask implements Runnable{

  public abstract boolean isReadyToProcess();

}

QuicksortTask class: This is the actual task which a thread in a thread pool executes. The root task of the quick sort divided into many such tasks based on the number of elements in the array that needs to be sorted. If the number of elements under consideration above SPLIT_THRESHOLD then it creates two subtasks otherwise sort those elements in the same task. Here we defined SPLIT_THRESHOLD to 100000 used for sorting a million elements.

import java.util.List;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Future;

public class QuickSortTask extends SortTask {
  private final int SPLIT_THRESHOLD = 100000;
  private int sortArray [];
  private int iStart = 0;
  private int iEnd  =0;
  ExecutorService threadPool;
  List futureList;

  public QuickSortTask(ExecutorService threadPool, List futureList, int[] inList,int start, int end){
    this.sortArray = inList;
    this.iStart = start;
    this.iEnd   = end;
    this.threadPool =threadPool;
    this.futureList =futureList;
  }

  @Override
  public boolean isReadyToProcess() {
    return true;
  }

  public void run() {
    sort(sortArray, iStart, iEnd) ;
  }

  private   void sort(final int inList[], int start, int end) {
    int pivot = inList[start]; // consider this as  hole at inList[start],
    int leftPointer = start;
    int rightPointer = end;
    final int LEFT = 1;
    final int RIGHT = -1;
    int pointerSide = RIGHT; //  we start with right as pivot is from left

    while (leftPointer != rightPointer) {
      if (pointerSide == RIGHT) {
        if (inList[rightPointer] < pivot) {           inList[leftPointer] = inList[rightPointer];           leftPointer++;           pointerSide = LEFT;         } else {           rightPointer--;         }       } else if (pointerSide == LEFT) {         if (inList[leftPointer] > pivot) {
          inList[rightPointer] = inList[leftPointer];
          rightPointer--;
          pointerSide = RIGHT;
        } else {
          leftPointer++;
        }
      }
    }

    //put the pivot where leftPointer and rightPointer collide i.e. leftPointer == rightPointer
    inList[leftPointer]=pivot;

    if((leftPointer - start) > 1){
      if ((leftPointer - start) > SPLIT_THRESHOLD){
        futureList.add(threadPool.submit(new QuickSortTask(threadPool,futureList,inList, start, leftPointer-1)));
      }else {
        sort(inList, start, leftPointer-1);
      }
    }

    if((end - leftPointer) > 1){
      if ((end - leftPointer) > SPLIT_THRESHOLD ){
        futureList.add(threadPool.submit(new QuickSortTask(threadPool,futureList,inList, leftPointer+1, end)));
      }  else {
        sort(inList, leftPointer+1, end);
      }
    }

  }

}

QuickSortInParallel class: Main class to put everything together. Here we create an instance of ThreadPoolExecutor with 10 threads and create a root task to sort a million random elements. Also, a list to hold Futures returned by submit method of ExecutorService. The get method of Future wait until the task is completed. We remove the Feature from the list when the corresponding task is done.
Empty Features List means all tasks are done and the list is sorted.

import java.util.ArrayList;
import java.util.List;
import java.util.Random;
import java.util.Vector;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;

public class QuickSortInParallel {

  public static void main(String [] args){
    final int  NO_OF_ELEMENT = 10000000;
    Random random = new Random();

    int [] sortArray = new int [NO_OF_ELEMENT ];
    for (int i=0; i< NO_OF_ELEMENT ;i++){
      sortArray[i] = random.nextInt(NO_OF_ELEMENT );
    }

    final ExecutorService executor = Executors.newFixedThreadPool(10);
    List futures = new Vector();
    QuickSortTask rootTask = new QuickSortTask (executor,futures,sortArray,0,sortArray.length-1)  ;
    long startTime = System.currentTimeMillis();
    futures.add(executor.submit(rootTask));
    while(!futures.isEmpty()){
 //     System.out.println("Future size " +futures.size());
      Future topFeature = futures.remove(0);
      try{
        if(topFeature!=null)topFeature.get();
      }catch(InterruptedException ie){
        ie.printStackTrace();
      }catch(ExecutionException ie){
        ie.printStackTrace();
      }
    }
    System.out.println("Time taken " + (System.currentTimeMillis() - startTime));
     for (int i=sortArray.length-1000; i<sortArray.length; i++){
       System.out.println(" element at " + i + " : " + sortArray[i]  );
    }
    executor.shutdown();
  }

}

June 4, 2011

Executing Mergesort in parallel using Java threads

Filed under: Java,Programming — MaheshT @ 4:18 pm

Merge sort is done by dividing the array recursively until there are one or two elements, sort it, and merge it back.
To run merge sort in parallel, we will divide a big array into multiple subtasks.
Here is the logic we will use.
(I) If the size of the array under consideration is bigger than the threshold then divide it and create two sub-tasks of sorting and submit those to the thread pool for execution.
(II) If subtasks of sorting divided array are done, then mere it back.
(III) If the size of the array under consideration is smaller than the threshold then sort it using mergesort in the same task.

Code for classes used in doing mergesort in parallel.

The interface implemented by each subtask

public abstract class SortTask implements Runnable{

  public abstract boolean isReadyToProcess();

}

MergeSortTask class which implements SortTask interface


import java.util.concurrent.atomic.AtomicBoolean;

/**
 * @author MaheshT
 *
 */
public class MergeSortTask extends SortTask{
  private int sortArray [];
  private int iStart = 0;
  private int iEnd  =0;
  private  int noOfSubtask =0;
  private final int SPLIT_THRESHOLD = 100000;
  private int taskDone =0;
  private MergeSortTask parentTask = null;
  private SortingThreadPool<SortTask> threadPool = null;
  private AtomicBoolean  splitDone =new AtomicBoolean(false);
  private Object waitForTaskDone = new Object();

  public MergeSortTask(SortingThreadPool<SortTask> threadPool,MergeSortTask parentTask,int [] inList,int start, int end){
    this.sortArray = inList;
    this.iStart = start;
    this.iEnd   = end;
    this.threadPool = threadPool;
    this.parentTask = parentTask;

  }

  private void splitSortNMerge(){
    splitSortNMerge(sortArray, iStart,iEnd);

  }

  private void splitSortNMerge(int inList[],int start, int end){
    int diff = end - start;
    int mid  = 0;
    int startIdxFirstArray =0;
    int endIdxFirstArray =0;
    int startIdxSecondArray=0;
    int endIdxSecondArray=0;

    if (diff > 1){ // more than two then split
      mid  = diff/2;
      startIdxFirstArray = start;
      endIdxFirstArray = start + mid;
      startIdxSecondArray= start + mid +1;
      endIdxSecondArray= end;
      if (diff > SPLIT_THRESHOLD){
        submitTaskToPool(new MergeSortTask(threadPool,this,sortArray,startIdxFirstArray, endIdxFirstArray));
        submitTaskToPool(new MergeSortTask(threadPool,this,sortArray,startIdxSecondArray, endIdxSecondArray));
        submitTaskToPool(this);
        noOfSubtask=2;
        //merge will be done by calling merge() function, when above tasks are done
        splitDone.set(true);
        return;
      }else {
        //recursive split and merge
        splitSortNMerge (inList,startIdxFirstArray, endIdxFirstArray);
        splitSortNMerge (inList,startIdxSecondArray, endIdxSecondArray);
        merge(inList,startIdxFirstArray,endIdxFirstArray,startIdxSecondArray,  endIdxSecondArray);
      }
    }else if (diff == 1){ // two element
      if (inList[start] > inList[end]){ //swap
        int tempVar = inList[start];
        inList[start] = inList[end];
        inList[end]   = tempVar;
      }
    }
  }

  private void merge(){

    int startIdxFirstArray =0;
    int endIdxFirstArray =0;
    int startIdxSecondArray=0;
    int endIdxSecondArray=0;

    int diff = iEnd - iStart;
    int mid  = 0;

    mid  = diff/2;
    startIdxFirstArray = iStart;
    endIdxFirstArray = iStart + mid;
    startIdxSecondArray=iStart + mid +1;
    endIdxSecondArray= iEnd;
    merge(sortArray,startIdxFirstArray,endIdxFirstArray,startIdxSecondArray,endIdxSecondArray);

  }

  private static void merge(int [] inList,int startIdxFirstArray,int endIdxFirstArray,
    int startIdxSecondArray, int endIdxSecondArray){

    int firstArryPtr = startIdxFirstArray;
    int secondArryPtr = startIdxSecondArray;

    int  []  tempArry = new int [endIdxSecondArray-startIdxFirstArray +1 ] ;
    //merge in sorted order
     int tempIdx =0;

     for (tempIdx=0;tempIdx < tempArry.length ; tempIdx++){
       if ( inList[firstArryPtr] < inList[secondArryPtr]){
         tempArry[tempIdx] = inList[firstArryPtr];
         firstArryPtr++;
         if (firstArryPtr > endIdxFirstArray) break;
       }else {
         tempArry[tempIdx] = inList[secondArryPtr];
         secondArryPtr++;
         if (secondArryPtr > endIdxSecondArray) break;
       }
     }
     if (firstArryPtr > endIdxFirstArray){
       while (tempIdx < tempArry.length-1){
         tempIdx++;
         tempArry[tempIdx]= inList[secondArryPtr];
         secondArryPtr++;
       }
     }else if (secondArryPtr > endIdxSecondArray){
       while (tempIdx < tempArry.length-1){
          tempIdx++;
         tempArry[tempIdx]= inList[firstArryPtr];
         firstArryPtr++;
       }
     }

     //copy sorted array
     for (int j=0;j < tempArry.length ; j++){
       inList[startIdxFirstArray+j] = tempArry[j];
     }
  }

  public boolean isReadyToProcess(){
    if (!splitDone.get()){
      return true;
    }else if (taskDone==noOfSubtask){
      return true;
    }else {
      return false;
    }
  }

  public synchronized void subTaskDone(){//
    taskDone++;
  }

  private void submitTaskToPool(MergeSortTask sortTask){
    threadPool.addTask(sortTask);
  }

  public void run(){
    if (splitDone.get()){
      this.merge();
    }else {
      this.splitSortNMerge();
     }

    if (taskDone==noOfSubtask) {
      if (parentTask !=null){
        this.parentTask.subTaskDone();
      }
      synchronized(waitForTaskDone){
        waitForTaskDone.notifyAll();
      }
    }
  }

  public  int [] get() throws InterruptedException {
    if (splitDone.get() && taskDone==noOfSubtask ){
      return sortArray;
    }else {
      synchronized(waitForTaskDone){
        waitForTaskDone.wait();
      }
    }
    return sortArray;
  }

  public String toString() {
    return "Task to sort from " + this.iStart +" To "+this.iEnd ;
  }
}

Thread pool which will execute each task when it ready to process.

import java.util.ArrayList;
import java.util.List;

public class SortingThreadPool<E extends SortTask > {

  List<E> taskList = new ArrayList<E>();
  List<WorkerThread> workers = new ArrayList<WorkerThread>();
  boolean shutdown = false;

  public SortingThreadPool(int noOfThreads){
    //add worker to connection pool
    for (int i=0; i <noOfThreads;i++){
      WorkerThread workerThread =new WorkerThread("Worker: "+i );
      workers.add(workerThread);
      workerThread.start();
    }
  }

  public void addTask(E e){
    synchronized(taskList){
      taskList.add(e);
      //System.out.println("Added task " + e);
      //done adding all tasks, notify all waiting threads.
      taskList.notifyAll();
    }

  }
  public void shutDown(){
    shutdown = true;
    synchronized(taskList){
      taskList.notifyAll();
    }
  }

  private class WorkerThread extends Thread {
    public WorkerThread(String name){
      super(name);
    }

    public void run(){
      System.out.println("Starting thread ");
      while (!shutdown || !taskList.isEmpty() ){
        //System.out.println("Reading task 2");
        try{
          synchronized(taskList){
           // System.out.println("Reading task");
            E tempTask= null;
            for (E myTask : taskList){
             // System.out.println( "Checking " +myTask);
              if ( myTask.isReadyToProcess()){
                tempTask =  myTask;
              }
            }
            if (tempTask !=null){
              //System.out.println( this.getName() + " working on " + tempTask);
              taskList.remove(tempTask);
              tempTask.run();
            }else {
              if ( !shutdown || !taskList.isEmpty()){
                System.out.println("Going to wait " + this.getName() + " size " + taskList.size());
                taskList.wait();
              }
            }
            //give other treads chance to work on tasks
            Thread.yield();
          }
        }catch(Exception e){
          e.printStackTrace();
        }
      }
      System.out.println("Exiting thread " + this.getName());
    }
  }

}

Finally, the main class will create an instance of a thread pool and submit a task of the sort a million random numbers.

import java.util.Random;
import java.util.concurrent.Callable;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.concurrent.FutureTask;

public class MergeSortInParallel {

  public static void main (String [] args){
    final int  NO_OF_ELEMENT = 10000000;
    Random random = new Random();

    int [] sortArray = new int [NO_OF_ELEMENT ];
    for (int i=0; i< NO_OF_ELEMENT ;i++){
      sortArray[i] = random.nextInt(NO_OF_ELEMENT );
    } 

    //int []sortArray = {16,15,14,13,12,11,10,9,8,7,6,5,4,3,2,1};
    //int []sortArray = {10,9,8,7,6,5,4,3,2,1};
    SortingThreadPool<SortTask> threadPool = new SortingThreadPool<SortTask>(10);
    MergeSortTask sortTask = new MergeSortTask(threadPool,null,sortArray,0,sortArray.length-1);
    long startTime = System.currentTimeMillis();
    threadPool.addTask(sortTask);

    try{
      sortTask.get();
    }catch(InterruptedException ie){
      ie.printStackTrace();
    }
    System.out.println("Time taken " + (System.currentTimeMillis() - startTime));   

    threadPool.shutDown();

   // for (int i=0; i<sortArray.length; i++){
   //   System.out.println(" element at " + i + " : " + sortArray[i]  );
   // }

  }

}

June 1, 2011

Sorting : Merge sort

Filed under: Java,Programming — MaheshT @ 8:47 pm

Performance

  • Best: O(n log n)
  • Average: O(n log n)
  • Worst:O(n Log n)

Steps in mergesort

  • Recursively Keep on splitting the list until each list has 1 or 2 elements.
  • If the list has one element it is already sorted.
  • If the list has two elements then sort it.
  • Combine lists in sorted order in an upward direction.

May 22, 2011

Sorting: QuickSort

Filed under: Java,Programming — MaheshT @ 8:50 pm

Performance

  • Best: O(n Log n)
  • Average:O(n Log n)
  • Worst: O( N^2 )

Steps in one iteration of a quick sort:

  • Select the leftmost element as pivot. Remove the pivot from the list and keep it outside.
  • There are two pointers, one in the left (left pointer ) and one in right ( right pointer). In the beginning, the left pointer points to the empty space of pivot and the right pointer at the rightmost element in the list
  • At any step, either the left pointer or right pointer element would be empty.
    Compare the non-empty element which is pointed by either right or left pointer with pivot.
  • If the right pointer element less than the pivot, move it to empty space pointed by the left pointer and move the left pointer by one element toward the center of the list.
  • If the right pointer element greater than or equal to the pivot, keep it in the same place and move the right pointer toward the center by one element.
  • If the left pointer element less than or equal to the pivot, keep it in the same place and move the left pointer toward the center by one element.
  • If the left pointer element is greater than the pivot, move to the empty space pointed by the right pointer and move the right pointer toward the center by one element.
  • When the left pointer and left pointer collides, move the pivot to that location. This will put pivot in the correct position in the list
  • Split the given list into two sub-lists divided by the pivot and run the same steps on each of them.
  • Run this recursively.

Quicksort implementation in Java

import java.util.Random;

/**
 * @author MaheshT
 *
 */
public class QuickSort {

  private  void sort(final int inList[], int start, int end) {
    int pivot = inList[start]; // consider this as  hole at inList[start],
    int leftPointer = start;
    int rightPointer = end;
    final int LEFT = 1;
    final int RIGHT = -1;
    int pointerSide = RIGHT; //  we start with right as pivot is from left

    while (leftPointer != rightPointer) {
      if (pointerSide == RIGHT) {
        if (inList[rightPointer] < pivot) {
          inList[leftPointer] = inList[rightPointer];
          leftPointer++;
          pointerSide = LEFT;
        } else {
          rightPointer--;
        }
      } else if (pointerSide == LEFT) {
        if (inList[leftPointer] > pivot) {
          inList[rightPointer] = inList[leftPointer];
          rightPointer--;
          pointerSide = RIGHT;
        } else {
          leftPointer++;
        }
      }
    }
    //put the pivot where leftPointer and rightPointer collide
    inList[leftPointer]=pivot;

    if((leftPointer - start) > 1){
      sort(inList, start, leftPointer-1);
    }

    if((end - leftPointer) > 1){
      sort(inList, leftPointer+1, end);
    }

  }

  public static void main(String[] args) {

    final int  NO_OF_ELEMENT = 10000000;
    Random random = new Random();

    int [] myArray = new int [NO_OF_ELEMENT];
    for (int i=0; i< NO_OF_ELEMENT ;i++){
      myArray[i] = random.nextInt(NO_OF_ELEMENT );
    } 

    QuickSort quickSort = new QuickSort();
   // int [] myArray = {2,8,3,9,8,2,3,2,4,3,4,5,3,4,5,8};
    long startTime = System.currentTimeMillis();
    quickSort.sort(myArray, 0, myArray.length-1);
    System.out.println("Time taken " + (System.currentTimeMillis() - startTime));

    for (int i=myArray.length-1000; i<myArray.length; i++){
      System.out.println(" element at " + i + " : " + myArray[i]  );
    }

  }

}

Powered by WordPress