Monday, January 26, 2015

Nanocube feeder in Java, part 2

Continuing on the previous post where we prepared DMP encoder, we now want to see how can we use it to encode and stream the data into Nanocube. And because Nanocube process uses standard input for data provision, we have to first start the process in order to obtain that input stream.

We will use ProcessBuilder class to set up everything needed to start the Nanocube. Configuration-wise, we only need to set up the directory where Nanocube binaries are present (in particular nanocube-leaf command).
 private final static Logger appLogger = Logger.getLogger("Feeder");  
 private final static Logger nanocubeOutLogger = Logger.getLogger("Nanocube OUT");  
 ...  
 ...  
 String nanocubeBinPath = "/home/vmarcinko/nanocube/bin";  
 ProcessBuilder pb = new ProcessBuilder(nanocubeBinPath + "/nanocube-leaf", "-q", "29512", "-f", "10000");  
   
 Map<String, String> env = pb.environment();  
 env.put("NANOCUBE_BIN", nanocubeBinPath);  
   
 pb.redirectOutput(ProcessBuilder.Redirect.PIPE);  
 pb.redirectInput(ProcessBuilder.Redirect.PIPE);  
 pb.redirectErrorStream(true);  
   
 appLogger.info("Starting Nanocube process...");  
 Process nanocubeProcess = pb.start();  
   
 ExecutorService executorService = Executors.newSingleThreadExecutor();  
 startNanocubeOutputLoggingTask(executorService, nanocubeProcess);  
   
 OutputStream inPipeOutputStream = nanocubeProcess.getOutputStream();  
 ...  
 ...  
As seen above, when we start Nanocube process, we fetch output end of the process input "pipe" in form of OutputStream, and we will use that to stream the data into Nanocube.

One interesting piece above is the way we handle Nanocube process output - regardless if it was error or standard output, we start new task in another thread (thus java.util.concurrent.Executor) to read that output and print it via some logging framework used in our java application (here plain java.util.logging loggers). Here is relevant startNanocubeOutputLoggingTask method:
 private static void startNanocubeOutputLoggingTask(Executor executor, final Process nanocubeProcess) {  
   InputStream outPipeInputStream = nanocubeProcess.getInputStream();  
   Runnable runnable = new Runnable() {  
     @Override  
     public void run() {  
       try {  
         try (BufferedReader reader = new BufferedReader(new InputStreamReader(outPipeInputStream))) {  
           String line;  
           while ((line = reader.readLine()) != null) {  
             nanocubeOutLogger.info(line);  
           }  
         }  
       } catch (IOException e) {  
         appLogger.log(Level.SEVERE, "Error reading from Nanocube output: " + e.getMessage(), e);  
       }  
     }  
   };  
   // start logging task  
   executor.execute(runnable);  
 }  
   
By having ExecutorService instance around, we can use it to stop that logging task at the application shutdown.

And finally, with the code above, and instance of NanocubeDmpEncoder ready (see previous post), we perform the feeding:
 appLogger.info("Streaming DMP content into Nanocube process...");  
   
 NanocubeDmpEncoder dmpEncoder = ...;  

 byte[] headerBytes = dmpEncoder.encodeHeaders();
 inPipeOutputStream.write(headerBytes);

 byte[] recordBytes = dmpEncoder.encodeRecord(....);
 inPipeOutputStream.write(recordBytes);

 inPipeOutputStream.flush();  
 inPipeOutputStream.close();  
   
 appLogger.info("Streaming of data finished");  
   
 nanocubeProcess.waitFor();  
 executorService.shutdown();  
 appLogger.info("Nanocube process stopped");  
   
At the end, we wait for Nanocube process to stop before doing the cleanup of our output logging task.


Wednesday, January 21, 2015

Nanocube feeder in Java, part 1

For some time I've been sporadically following Nanocube project, which is a novel way to index spatio-temporal data, all in memory, enabling very fast analytical querying and visualization of fairly large datasets. Just to get rough impression of Nanocube memory consumption - for up to 10 billions of records, it can require around few tens of GB of RAM, and the querying will work sufficiently fast to allow real-time interactive exploration of such huge pool of data.

Note - this post is based on nanocube version 3.1.

Currently provided way to feed data into nanocube is via python command line tool - nanocube-binning-csv.py.
In a nuthshell, this tool is consuming CSV datasets and converts it into so called "dmp" format which is than piped into nanocube server. This can be cumbersome way for some use cases, and being a java developer I wanted to have some java-friendly way to feed the data, so I set out to develop my own feeder in Java.

DMP Encoder

Biggest portion of my feeder would be DMP encoder which would convert input data, given as plain java objects, into DMP format required by nanocube server. Since documentation about DMP format is somewhat scarce, the easiest way seemed to be analyzing python code within nanocube-binning-csv.py tool, and translate it to Java.

The result encoder is not the most generic one, but focused on the most common use cases - when we have following dimensions - spatial, temporal, count, and arbitrary number of categorical dimensions. Of course, you need accordingly to have compiled binaries of nanocube for given combination of dimensions and their details (such as byte lengths of certain dimensions, zoom level etc..), and few of them are found in nanocube-dir/bin directory, example being nc_q25_c1_c1_u2_u4.

Whole source code for DMP encoder is available here.

The way to use it is as follows.

You first instantiate the encoder by specifying schema for the dataset. Constructor arguments are:
  • name
  • number of location zoom levels (eg. 25 is typically sufficient)
  • time offset
  • length of time bin in seconds
  • length of time value in bytes (2, 4 or 8)
  • map that describes categorical dimensions and their options
 NanocubeDmpEncoder dmpEncoder = new NanocubeDmpEncoder("telco-data", 25, timeOffset, 3600, 2, categoryEnumerations);  
Map of category data is actually a 2-level map. Top level map contains entries where keys are category names and values are maps again, containing enum options specification for given category. Each enum options map ties domain-specific option value to (label, byte) pair which specifies how that option will be presented (label) and also how it will be encoded in nanocube (byte value).

For example, if we want to have categories "NetType" and "Status" which will represent network type and status for some mobile call, then in Java we will usually use enum class, such as:
 public enum NetType {  
   GSM, GPRS, UMTS, EDGE  
 }  
 public enum Status {  
   SUCCEEDED, FAILED  
 }  
So you can define our category definition map something like:
 Map<String, Map<Object, CategoryEnumInfo>> categoryEnumerations = new HashMap<>();  
 categoryEnumerations.put("NetType", createEnumInfosFromEnumClass(NetType.class));  
 categoryEnumerations.put("Status", createEnumInfosFromEnumClass(Status.class));  
 ....  
 private <E extends Enum> Map<Object, CategoryEnumInfo> createEnumInfosFromEnumClass(Class<E> enumClass) {  
   Map<Object, CategoryEnumInfo> enumInfos = new HashMap<>();  
   for (E constant : enumClass.getEnumConstants()) {  
     enumInfos.put(constant, new CategoryEnumInfo(constant.name(), (byte) constant.ordinal()));  
   }  
   return enumInfos;  
 }  
So now when we have NanocubeDmpEncoder instance constructed, now we can encode the headers and records into required DMP format.
 // headers  
 byte[] headerBytes = dmpEncoder.encodeHeaders();  
 // record  
 Map<String, Object> categoryValues = new HashMap<>();  
 categoryValues.put("NetType", NetType.GSM);  
 categoryValues.put("Status", Status.SUCCEEDED);  
 byte[] recordBytes = dmpEncoder.encodeRecord(44.8789661034259, 17.72673345412568, new Date(), 1, categoryValues);  

In second part of this post we will see how to boot Nanocube server from Java process, and how to stream encoded data into it.