Building Inverted Index using MapReduce and HBase Java Client API | Proof of Concept (POC) 1



Inverted Index

This Proof of Concept (POC) involves creating an inverted index of the contents of all the books, along with the title, author, and any book metadata (e.g., language). An inverted index is a mapping of words to their location in a set of documents. Most modern search engines utilize some form of an inverted index to process user-submitted queries. In its most basic form, an inverted index is a simple hash table which maps words in the documents to some sort of document identifier. For example, if given the following two documents:

Doc1 (if you don’t know the story, search for this on Wikipedia):

Buffalo buffalo Buffalo buffalo buffalo buffalo Buffalo buffalo.

Doc2:

Buffalo are mammals.

You could construct the following inverted file index:

Buffalo -> Doc1<3>, Doc2<1>
buffalo -> Doc1<4>
buffalo. -> Doc1<1>
are -> Doc2<1>
mammals. -> Doc2<1>



The input files are:

Create input directory structure in HDFS:

hdfs dfs -mkdir /POCs/POC1/input_data

Load the input files into HDFS directory(/POCs/POC1/input_data):

hdfs dfs -copyFromLocal /home/demouser/DataMaking/POCs/POC1 /POCs/POC1/input_data

MapReduce Job Java Source Code:

BuildingInvertedIndexDriver.java

package com.mr.invertedindex;

import org.apache.hadoop.conf.Configured;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;
import org.apache.hadoop.util.Tool;
import org.apache.hadoop.util.ToolRunner;

public class BuildingInvertedIndexDriver extends Configured implements Tool {
public int run(String[] args) throws Exception {
if (args.length != 2) {
System.out.println("Please provide: [input path] [output path]");
System.exit(-1);
}

Job job = Job.getInstance(getConf());
job.setJobName("Building InvertedIndex");
job.setJarByClass(BuildingInvertedIndexDriver.class);

job.setMapOutputKeyClass(Text.class);
job.setMapOutputValueClass(Text.class);
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(Text.class);

job.setMapperClass(BuildingInvertedIndexMapper.class);
job.setReducerClass(BuildingInvertedIndexReducer.class);

job.setInputFormatClass(TextInputFormat.class);
job.setOutputFormatClass(TextOutputFormat.class);
job.setNumReduceTasks(1);

Path inputFilePath = new Path(args[0]);
Path outputFilePath = new Path(args[1]);

FileInputFormat.addInputPath(job, inputFilePath);
FileOutputFormat.setOutputPath(job, outputFilePath);

/*
* Delete output filepath if already exists
*/
FileSystem fs = FileSystem.newInstance(getConf());

if (fs.exists(outputFilePath)) {
fs.delete(outputFilePath, true);
}

return job.waitForCompletion(true) ? 0: 1;
}

public static void main(String[] args) throws Exception {
BuildingInvertedIndexDriver buildingInvertedIndexDriver = new BuildingInvertedIndexDriver();
int res = ToolRunner.run(buildingInvertedIndexDriver, args);
System.exit(res);
}
}




BuildingInvertedIndexMapper.java

package com.mr.invertedindex;

import java.io.IOException;
import java.util.StringTokenizer;

import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.lib.input.FileSplit;

public class BuildingInvertedIndexMapper extends
Mapper&lt;Object, Text, Text, Text&gt; {

public void map(Object key, Text value, Context context)
throws IOException, InterruptedException {

Text wordAsKey = new Text();

StringTokenizer stringTokenizer = new StringTokenizer(value.toString(), ")(}{][\",. \t\n");

/*String fileName = new String(((FileSplit) (context.getInputSplit()))
.getPath().toString());*/

String fileName = new String(((FileSplit) (context.getInputSplit())).getPath().getName().toString());

Text fileNameAsValue = new Text(fileName);

while (stringTokenizer.hasMoreTokens()) {
wordAsKey.set(stringTokenizer.nextToken());

// Emits Intermediate Key and Value pairs
context.write(wordAsKey, fileNameAsValue);
}
}
}




BuildingInvertedIndexReducer.java

package com.mr.invertedindex;

import java.io.IOException;
import java.util.Enumeration;
import java.util.Hashtable;

import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Reducer;

public class BuildingInvertedIndexReducer extends
Reducer&lt;Text, Text, Text, Text&gt; {

public void reduce(Text wordAsKey, Iterable fileNameAsValues, Context context)
throws IOException, InterruptedException {

Hashtable&lt;String, Long&gt; tableAsFileNameAndWordCount = new Hashtable&lt;String, Long&gt;();

for (Text fileNameAsValue : fileNameAsValues) {
if (tableAsFileNameAndWordCount.containsKey(fileNameAsValue.toString())) {
Long tempWordCount = tableAsFileNameAndWordCount.get(fileNameAsValue.toString());
tempWordCount = tempWordCount.longValue() + 1;
tableAsFileNameAndWordCount.put(fileNameAsValue.toString(), tempWordCount);
} else
tableAsFileNameAndWordCount.put(fileNameAsValue.toString(), new Long(1));
}
StringBuffer resultAsFileNameAndWordCount = new StringBuffer();
Enumeration enumerationAsFileName = tableAsFileNameAndWordCount.keys();
while (enumerationAsFileName.hasMoreElements()) {
String tempKeyAsFileName = enumerationAsFileName.nextElement().toString();
Long tempValueAsWordCount = (Long) tableAsFileNameAndWordCount.get(tempKeyAsFileName);
resultAsFileNameAndWordCount.append("&lt;"); resultAsFileNameAndWordCount.append(tempKeyAsFileName); resultAsFileNameAndWordCount.append(" , "); resultAsFileNameAndWordCount.append(tempValueAsWordCount.toString()); resultAsFileNameAndWordCount.append("&gt; ");
}
context.write(wordAsKey, new Text(resultAsFileNameAndWordCount.toString()));
}
}




Run the MapReduce job to build the inverted index:

hadoop jar /home/demouser/DataMaking/java_workarea/BuildingInvertedIndex/target/BuildingInvertedIndex-1.0-SNAPSHOT.jar com.mr.invertedindex.BuildingInvertedIndexDriver /POCs/POC1/input_data /POCs/POC1/output_data



Output files of Inverted index in HDFS:







In HBase Shell, run the following commands:

Create HBase table,

hbase shell

create 'invertedindex','invertedindex_info'

In Terminal, run the following commands:

Generate HFile using ImportTsv command-line utility for this above created HBase table from the Inverted Index MapReduce job,

hbase org.apache.hadoop.hbase.mapreduce.ImportTsv -Dimporttsv.bulk.output=/import_indexed_output -Dimporttsv.columns=HBASE_ROW_KEY,invertedindex_info:filenamewithwordcount invertedindex /POCs/POC1/output_data1/part-r-00000

Set the permission for "hbase" user for the generated HBase table's HFiles directory in HDFS,

hdfs dfs -chown -R hbase:hbase /import_indexed_output

Load HBase table's HFiles into HBase table using LoadIncrementalHFiles command-line utility,

hbase org.apache.hadoop.hbase.mapreduce.LoadIncrementalHFiles /import_indexed_output invertedindex

Search word program using HBase API(Java)

SearchOnIndexedDataDemo.java

package com.hbasetable.indexeddata;

import java.io.Console;
import java.io.IOException;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.HBaseConfiguration;
import org.apache.hadoop.hbase.KeyValue;
import org.apache.hadoop.hbase.client.Get;
import org.apache.hadoop.hbase.client.HTable;
import org.apache.hadoop.hbase.client.Result;
import org.apache.hadoop.hbase.util.Bytes;

public class SearchOnIndexedDataDemo {

public static Configuration configuration;
static {
configuration = HBaseConfiguration.create();
configuration.set("hbase.zookeeper.property.clientPort", "2181");
configuration.set("hbase.zookeeper.quorum", "localhost");
configuration.set("hbase.master", "localhost:60000");
}

public static void main(String[] args) {
String _tableName = "invertedindex";
Console _console = null;
String _inputWord = "";

try {
// creates a console object
_console = System.console();

// if console is not null
if (_console != null) {

System.out.println("\n");
// read line from the user input
_inputWord = _console
.readLine("Enter the word to search on Indexed Data: ");

SearchOnIndexedDataDemo searchOnIndexedDataDemo = new SearchOnIndexedDataDemo();
searchOnIndexedDataDemo.getIndexedDataBasedOnInput(_tableName, _console,
_inputWord);

}
} catch (Exception ex) {

// if any error occurs
ex.printStackTrace();
}
}

private void getIndexedDataBasedOnInput(String _tableName, Console _console, String _inputWord) throws IOException {
System.out
.println("Search Results Starts Here ..................................................................");
System.out.println("\n");

String _columnFamily = "invertedindex_info";
String _columnQualifier = "filenamewithwordcount";

HTable _table = new HTable(configuration, _tableName);
Get _get = new Get(_inputWord.getBytes());
Result _result = _table.get(_get);
if(_result.isEmpty())
{
System.out.println("No results found. ");
}else
{
System.out.println("Word -----------------------&gt; " + Bytes.toString(_result.getRow()));
System.out.println("File name with Word count --&gt; " + Bytes.toString(_result.getValue(Bytes.toBytes(_columnFamily), Bytes.toBytes(_columnQualifier))));
}
System.out.println("\n");
System.out
.println("Search Results Ends Here ....................................................................");

System.out.println("\n");
// read line from the user input
String _inputWordAgain = _console
.readLine("Enter 0 to exit, or Enter another word to search on Indexed Data: ");
if (isInputIsInt(_inputWordAgain)) {
System.exit(0);
} else {
getIndexedDataBasedOnInput(_tableName, _console, _inputWordAgain);
}
_table.close();
}

private boolean isInputIsInt(String _inputData) {
try {
Integer.parseInt(_inputData);
return true;
} catch (NumberFormatException ex) {
return false;
}
}
}




Run the program using mvn(Maven) utility to search the word from inverted index built:

mvn exec:java



Happy Learning !!!

Post a Comment

0 Comments