Category Archives: Data Structure

Dealing large sequence data

Dealing large sequence data

External merge sort

If the size of the file is too big to be held in the memory during sorting. This algorithm minimizes the number of disk accesses and improves the sorting performance.

External sorting is required when the data being sorted do not fit into the main memory of a computing device (usually RAM) and instead they must reside in the slower external memory (usually a hard drive). External sorting typically uses a sort-merge strategy. In the sorting phase, chunks of data small enough to fit in main memory are read, sorted, and written out to a temporary file. In the merge phase, the sorted sub files are combined into a single larger file.

In raw manner:

Let’s say your data set has one million elements and you can only fit 100,000 in memory at a time. Here’s what I’d do:

  • read in the first 100,000, sort them and write that sorted list back out.
  • do this for each group of 100,000.
  • run a merge operation on the 10 groups.

In other words, once your 10 groups are sorted within the group, grab the first entry from each group.

Then write that the lowest of those 10 (which is the lowest of the whole million) to the output file and read the next one from that group in its place.

Then just continue selecting the lowest of the 10, writing it out and replacing it from the same group. In that way, the final output is the entire sorted list of a million entries.

Steps:

  1. Split the original file into two equal-sized run files.
  2. Read one block from each run file into input buffers.
  3. Take the first record from each input buffer, and write a run of length two to an output buffer in sorted order.
  4. Take the next record from each input buffer, and write a run of length two to a second output buffer in sorted order.
  5. Repeat until finished, alternating output between the two output run buffers. Whenever the end of an input block is reached, read the next block from the appropriate input file. When an output buffer is full, write it to the appropriate output file.
  6. Repeat steps 2 through 5, using the original output files as input files. On the second pass, the first two records of each input run file are already in sorted order. Thus, these two runs may be merged and output as a single run of four elements.
  7. Each pass through the run files provides larger and larger runs until only one run remains.

This algorithm can easily take advantage of double buffering . Note that the various passes read the input run files sequentially and write the output run files sequentially. For sequential processing and double buffering to be effective, however, it is necessary that there be a separate I/O head available for each file. This typically means that each of the input and output files must be on separate disk drives, requiring a total of four disk drives for maximum efficiency.

 Improving Performance

One example of external sorting is the external merge sort algorithm, which sorts chunks that each fit in RAM, then merges the sorted chunks together. For example, for sorting 900 megabytes of data using only 100 megabytes of RAM:

1) Read 100 MB of the data in main memory and sort by some conventional method, like quicksort.
2) Write the sorted data to disk.
3) Repeat steps 1 and 2 until all of the data is in sorted 100 MB chunks (there are 900MB / 100MB = 9 chunks), which now need to be merged into one single output file.
4) Read the first 10 MB (= 100MB / (9 chunks + 1)) of each sorted chunk into input buffers in main memory and allocate the remaining 10 MB for an output buffer. (In practice, it might provide better performance to make the output buffer larger and the input buffers slightly smaller.)
5) Perform a 9-way merge and store the result in the output buffer. If the output buffer is full, write it to the final sorted file, and empty it. If any of the 9 input buffers gets empty, fill it with the next 10 MB of its associated 100 MB sorted chunk until no more data from the chunk is available.

Optimizing a little with additional passes –
1) For sorting, say, 50 GB in 100 MB of RAM, using a single merge pass isn’t efficient: the disk seeks required to fill the input buffers with data from each of the 500 chunks (we read 100MB / 501 ~ 200KB from each chunk at one time) take up most of the sort time. Using two merge passes solves the problem. Then the sorting process might look like this:
2) Run the initial chunk-sorting pass as before.
3) Run a first merge pass combining 25 chunks at a time, resulting in 20 larger sorted chunks.
4) Run a second merge pass to merge the 20 larger sorted chunks.

Implementation –
Sort a 4Gb file in 10Mb memory (or a system in which many processes run and total memory of that system is x Mb)

Java code: 

package main.java.algo.sorting;
import java.util.*;
import java.io.*;
// Goal: offer a generic external-memory sorting program in Java.
//
// It must be :
//  - hackable (easy to adapt)
//  - scalable to large files
//  - sensibly efficient.
// This software is in the public domain.
public class ExternalSort {
    
    
    // we divide the file into small blocks. If the blocks
    // are too small, we shall create too many temporary files.
    // If they are too big, we shall be using too much memory.
    public static long estimateBestSizeOfBlocks(File filetobesorted) {
        long sizeoffile = filetobesorted.length();
        // we don't want to open up much more than 1024 temporary files, better run
        // out of memory first. (Even 1024 is stretching it.)
        final int MAXTEMPFILES = 1024;
        long blocksize = sizeoffile / MAXTEMPFILES ;
        // on the other hand, we don't want to create many temporary files
        // for naught. If blocksize is smaller than half the free memory, grow it.
        long freemem = Runtime.getRuntime().freeMemory();
        if( blocksize < freemem/2)
            blocksize = freemem/2;
        else {
            if(blocksize >= freemem)
              System.err.println("We expect to run out of memory. ");
        }
        return blocksize;
    }
     // This will simply load the file by blocks of x rows, then
     // sort them in-memory, and write the result to a bunch of
     // temporary files that have to be merged later.
     //
     // @param file some flat  file
     // @return a list of temporary flat files
    public static List<File> sortInBatch(File file, Comparator<String> cmp) throws IOException {
        List<File> files = new ArrayList<File>();
        BufferedReader fbr = new BufferedReader(new FileReader(file));
        long blocksize = estimateBestSizeOfBlocks(file);// in bytes
        try{
            List<String> tmplist =  new ArrayList<String>();
            String line = "";
            try {
                while(line != null) {
                    long currentblocksize = 0;// in bytes
                    while((currentblocksize < blocksize)
                    &&(   (line = fbr.readLine()) != null) ){ // as long as you have 2MB
                        tmplist.add(line);
                        currentblocksize += line.length() // 2 + 40; // java uses 16 bits per character + 40 bytes of overhead (estimated)
                    }
                    files.add(sortAndSave(tmplist,cmp));
                    tmplist.clear();
                }
            } catch(EOFException oef) {
                if(tmplist.size()>0) {
                    files.add(sortAndSave(tmplist,cmp));
                    tmplist.clear();
                }
            }
        } finally {
            fbr.close();
        }
        return files;
    }
    public static File sortAndSave(List<String> tmplist, Comparator<String> cmp) throws IOException  {
        Collections.sort(tmplist,cmp);  //
        File newtmpfile = File.createTempFile("sortInBatch", "flatfile");
        newtmpfile.deleteOnExit();
        BufferedWriter fbw = new BufferedWriter(new FileWriter(newtmpfile));
        try {
            for(String r : tmplist) {
                fbw.write(r);
                fbw.newLine();
            }
        } finally {
            fbw.close();
        }
        return newtmpfile;
    }
     // This merges a bunch of temporary flat files
     // @param files
     // @param output file
     // @return The number of lines sorted. (P. Beaudoin)
    public static int mergeSortedFiles(List<File> files, File outputfile, final Comparator<String> cmp) throws IOException {
        PriorityQueue<BinaryFileBuffer> pq = new PriorityQueue<BinaryFileBuffer>(11,
            new Comparator<BinaryFileBuffer>() {
              public int compare(BinaryFileBuffer i, BinaryFileBuffer j) {
                return cmp.compare(i.peek(), j.peek());
              }
            }
        );
        for (File f : files) {
            BinaryFileBuffer bfb = new BinaryFileBuffer(f);
            pq.add(bfb);
        }
        BufferedWriter fbw = new BufferedWriter(new FileWriter(outputfile));
        int rowcounter = 0;
        try {
            while(pq.size()>0) {
                BinaryFileBuffer bfb = pq.poll();
                String r = bfb.pop();
                fbw.write(r);
                fbw.newLine();
                ++rowcounter;
                if(bfb.empty()) {
                    bfb.fbr.close();
                    bfb.originalfile.delete();// we don't need you anymore
                } else {
                    pq.add(bfb); // add it back
                }
            }
        } finally {
            fbw.close();
            for(BinaryFileBuffer bfb : pq ) bfb.close();
        }
        return rowcounter;
    }
    public static void main(String[] args) throws IOException {
        if(args.length<2) {
            System.out.println("please provide input and output file names");
            return;
        }
        String inputfile = args[0];
        String outputfile = args[1];
        Comparator<String> comparator = new Comparator<String>() {
            public int compare(String r1, String r2){
                return r1.compareTo(r2);}};
        List<File> l = sortInBatch(new File(inputfile), comparator) ;
        mergeSortedFiles(l, new File(outputfile), comparator);
    }
}
class BinaryFileBuffer  {
    public static int BUFFERSIZE = 2048;
    public BufferedReader fbr;
    public File originalfile;
    private String cache;
    private boolean empty;
    
    public BinaryFileBuffer(File f) throws IOException {
        originalfile = f;
        fbr = new BufferedReader(new FileReader(f), BUFFERSIZE);
        reload();
    }
    
    public boolean empty() {
        return empty;
    }
    
    private void reload() throws IOException {
        try {
          if((this.cache = fbr.readLine()) == null){
            empty = true;
            cache = null;
          }
          else{
            empty = false;
          }
      } catch(EOFException oef) {
        empty = true;
        cache = null;
      }
    }
    
    public void close() throws IOException {
        fbr.close();
    }
    
    
    public String peek() {
        if(empty()) return null;
        return cache.toString();
    }
    public String pop() throws IOException {
      String answer = peek();
        reload();
      return answer;
    }  
    
 }
Code reference:  http://www.ashishsharma.me/2011/08/external-merge-sort.html

Given an infinite stream of integers with only k distinct integers, find those distinct integers. How would you modify your solution if k is also very large and cannot use hash table

Keep as many elements in the hash_map as you can. Once the map capacity is exhausted, sort all the elements in the hash_map and write them to disk and build a bloom filter. Keep this bloom filter in memory, empty the hash_map and now for every new number in the stream, check in the bloom filter, if it says does not exist, that means this is a new number, insert in the map. If it says the number does exist, then this can be a false positive, then first check in the map, if not found in it, it could be on disk.

Use a binary search on the file (this will take O(logn) disk seeks) and verify if its present in the file or not. If present, ignore, else add in the map. Keep repeating the process of flushing the map to disk whenever the map is full. To flush, you could either use any external sorting method, or just write out a new file each time. If separate files, you will need to binary search in all of them in case of false positive.

How would one sort 1 billion integers.  .. I gave external sorting answer but he waned more .. What should i say ? Asked me to tell him how would the problem change  if total of  4 threads are given. How would you solve ?

I would suggest splitting the data into four pieces and having each
thread do an external sort on one piece. Essentially that means
breaking up the data into chunks which can be stored in memory,
sorting them, and then merging the chunks back into one piece. After the 4 threads are done sorting their piece, use two threads to merge the four pieces into two pieces, and then use one thread to merge those into the one final output file.