Tuesday, March 1, 2016

First Clojure workshop in Osijek, Croatia


On 9th of March, 2016, I'll be holding Clojure workshop in my hometown Osijek, Croatia. I wanna thank Osijek Software City for providing me space, equipment and advertising. Workshop will be held in Croatian language, and anyone who is interested can can apply here for attendance.

In Osijek, where in last few years IT scene is growing with tremendous pace, Clojure is practically still unknown language, so I hope this way I will raise some interest in such an awesome language.

Monday, April 27, 2015

Setting up Undertow with Spring MVC

In case you still don't know about it, Undertow is web server developed under JBoss community, and is praised a lot for its impressive performance. I usually deployed my web application in .war form under some servlet container (mostly Tomcat), but now I wanted to try out embedded servlet container, and Undertow had seemed especially good for that considering its small footprint.

I am also heavy Spring user, so I wanted to integrate this web server with Spring MVC, but only examples so far where I could find Spring-Undertow integration is within Spring Boot project, and for some reason I still use plain Spring Framework, so I set out to do it my own.

Of course, as everything in Spring is a bean, I developed my own UndertowServer bean which was pretty straightforward, and bigger question was how to define my web application deployment in it because there are couple of ways to do it. I decided to use Servlet 3.0+ ServletContainertInitializer (one other option would be to straight use Undertow's API to specify web application deployment).

Here is piece of code from our implementation of this interface:
 public class WebAppServletContainerInitializer implements ServletContainerInitializer, ApplicationContextAware {  
   private ApplicationContext applicationContext;  

   @Override  
   public void onStartup(Set<Class<?>> c, ServletContext ctx) throws ServletException {  
     XmlWebApplicationContext rootWebAppContext = new XmlWebApplicationContext();  
     rootWebAppContext.setConfigLocation("/WEB-INF/applicationContext.xml");  
     rootWebAppContext.setParent(applicationContext);  
     ctx.addListener(new ContextLoaderListener(rootWebAppContext));
  
     FilterRegistration.Dynamic encodingFilter = ctx.addFilter("encoding-filter", CharacterEncodingFilter.class);  
     encodingFilter.setInitParameter("encoding", "UTF-8");  
     encodingFilter.setInitParameter("forceEncoding", "true");  
     encodingFilter.addMappingForServletNames(EnumSet.allOf(DispatcherType.class), false, "admin");  

     FilterRegistration.Dynamic springSecurityFilterChain = ctx.addFilter("springSecurityFilterChain", DelegatingFilterProxy.class);  
     springSecurityFilterChain.addMappingForServletNames(EnumSet.allOf(DispatcherType.class), false, "admin");  
     ServletRegistration.Dynamic dispatcher = ctx.addServlet("admin", DispatcherServlet.class);  
     dispatcher.setLoadOnStartup(1);  
     dispatcher.addMapping("/admin/*");  
 ...  
 ...  

We had to inject our main ApplicationContext (that contains UndertowServer bean) via ApplicationContextAware marker, to be able to use it as parent context for root WebApplicationContext that we put under /WEB-INF/ together with other DispatcherServlet contexts.

Later on, we use this ServletContainertInitializer implementation during startup phase of Undertow server to construct DeploymentInfo object:
 InstanceFactory<? extends ServletContainerInitializer> instanceFactory = new ImmediateInstanceFactory<>(servletContainerInitializer);  
 ServletContainerInitializerInfo sciInfo = new ServletContainerInitializerInfo(WebAppServletContainerInitializer.class, instanceFactory, new HashSet<>());  

 DeploymentInfo deploymentInfo = constructDeploymentInfo(sciInfo);  

FInal result is that we can now readily use this web server bean in our Spring XML deployment descriptors, such as:
 <bean class="vmarcinko.undertow.UndertowServer">  
   <property name="port" value="8080"/>  
   <property name="webAppName" value="myapp"/>  
   <property name="webAppRoot" value="${distribution.dir}/web-app-root"/>  
   <property name="servletContainerInitializer">  
     <bean class="vmarcinko.web.admin.WebAppServletContainerInitializer"/>  
   </property>  
 </bean>  

If you ask me why I still use Spring XML in 2015 instead of Java config - well, I think XML is still nicer DSL for describing deployment than Java, but that's just me :)

Anyway, when you boot the system, this little web server will be up and running under specified port, serving this single web application under given root directory. In the example above, my administration web console (specific DispatcherServlet serving that under '/admin/*') would be available under:

http://localhost:8080/myapp/admin

Whole code for both classes is available at this Gist.



Monday, January 26, 2015

Nanocube feeder in Java, part 2

Continuing on the previous post where we prepared DMP encoder, we now want to see how can we use it to encode and stream the data into Nanocube. And because Nanocube process uses standard input for data provision, we have to first start the process in order to obtain that input stream.

We will use ProcessBuilder class to set up everything needed to start the Nanocube. Configuration-wise, we only need to set up the directory where Nanocube binaries are present (in particular nanocube-leaf command).
 private final static Logger appLogger = Logger.getLogger("Feeder");  
 private final static Logger nanocubeOutLogger = Logger.getLogger("Nanocube OUT");  
 ...  
 ...  
 String nanocubeBinPath = "/home/vmarcinko/nanocube/bin";  
 ProcessBuilder pb = new ProcessBuilder(nanocubeBinPath + "/nanocube-leaf", "-q", "29512", "-f", "10000");  
   
 Map<String, String> env = pb.environment();  
 env.put("NANOCUBE_BIN", nanocubeBinPath);  
   
 pb.redirectOutput(ProcessBuilder.Redirect.PIPE);  
 pb.redirectInput(ProcessBuilder.Redirect.PIPE);  
 pb.redirectErrorStream(true);  
   
 appLogger.info("Starting Nanocube process...");  
 Process nanocubeProcess = pb.start();  
   
 ExecutorService executorService = Executors.newSingleThreadExecutor();  
 startNanocubeOutputLoggingTask(executorService, nanocubeProcess);  
   
 OutputStream inPipeOutputStream = nanocubeProcess.getOutputStream();  
 ...  
 ...  
As seen above, when we start Nanocube process, we fetch output end of the process input "pipe" in form of OutputStream, and we will use that to stream the data into Nanocube.

One interesting piece above is the way we handle Nanocube process output - regardless if it was error or standard output, we start new task in another thread (thus java.util.concurrent.Executor) to read that output and print it via some logging framework used in our java application (here plain java.util.logging loggers). Here is relevant startNanocubeOutputLoggingTask method:
 private static void startNanocubeOutputLoggingTask(Executor executor, final Process nanocubeProcess) {  
   InputStream outPipeInputStream = nanocubeProcess.getInputStream();  
   Runnable runnable = new Runnable() {  
     @Override  
     public void run() {  
       try {  
         try (BufferedReader reader = new BufferedReader(new InputStreamReader(outPipeInputStream))) {  
           String line;  
           while ((line = reader.readLine()) != null) {  
             nanocubeOutLogger.info(line);  
           }  
         }  
       } catch (IOException e) {  
         appLogger.log(Level.SEVERE, "Error reading from Nanocube output: " + e.getMessage(), e);  
       }  
     }  
   };  
   // start logging task  
   executor.execute(runnable);  
 }  
   
By having ExecutorService instance around, we can use it to stop that logging task at the application shutdown.

And finally, with the code above, and instance of NanocubeDmpEncoder ready (see previous post), we perform the feeding:
 appLogger.info("Streaming DMP content into Nanocube process...");  
   
 NanocubeDmpEncoder dmpEncoder = ...;  

 byte[] headerBytes = dmpEncoder.encodeHeaders();
 inPipeOutputStream.write(headerBytes);

 byte[] recordBytes = dmpEncoder.encodeRecord(....);
 inPipeOutputStream.write(recordBytes);

 inPipeOutputStream.flush();  
 inPipeOutputStream.close();  
   
 appLogger.info("Streaming of data finished");  
   
 nanocubeProcess.waitFor();  
 executorService.shutdown();  
 appLogger.info("Nanocube process stopped");  
   
At the end, we wait for Nanocube process to stop before doing the cleanup of our output logging task.


Wednesday, January 21, 2015

Nanocube feeder in Java, part 1

For some time I've been sporadically following Nanocube project, which is a novel way to index spatio-temporal data, all in memory, enabling very fast analytical querying and visualization of fairly large datasets. Just to get rough impression of Nanocube memory consumption - for up to 10 billions of records, it can require around few tens of GB of RAM, and the querying will work sufficiently fast to allow real-time interactive exploration of such huge pool of data.

Note - this post is based on nanocube version 3.1.

Currently provided way to feed data into nanocube is via python command line tool - nanocube-binning-csv.py.
In a nuthshell, this tool is consuming CSV datasets and converts it into so called "dmp" format which is than piped into nanocube server. This can be cumbersome way for some use cases, and being a java developer I wanted to have some java-friendly way to feed the data, so I set out to develop my own feeder in Java.

DMP Encoder

Biggest portion of my feeder would be DMP encoder which would convert input data, given as plain java objects, into DMP format required by nanocube server. Since documentation about DMP format is somewhat scarce, the easiest way seemed to be analyzing python code within nanocube-binning-csv.py tool, and translate it to Java.

The result encoder is not the most generic one, but focused on the most common use cases - when we have following dimensions - spatial, temporal, count, and arbitrary number of categorical dimensions. Of course, you need accordingly to have compiled binaries of nanocube for given combination of dimensions and their details (such as byte lengths of certain dimensions, zoom level etc..), and few of them are found in nanocube-dir/bin directory, example being nc_q25_c1_c1_u2_u4.

Whole source code for DMP encoder is available here.

The way to use it is as follows.

You first instantiate the encoder by specifying schema for the dataset. Constructor arguments are:
  • name
  • number of location zoom levels (eg. 25 is typically sufficient)
  • time offset
  • length of time bin in seconds
  • length of time value in bytes (2, 4 or 8)
  • map that describes categorical dimensions and their options
 NanocubeDmpEncoder dmpEncoder = new NanocubeDmpEncoder("telco-data", 25, timeOffset, 3600, 2, categoryEnumerations);  
Map of category data is actually a 2-level map. Top level map contains entries where keys are category names and values are maps again, containing enum options specification for given category. Each enum options map ties domain-specific option value to (label, byte) pair which specifies how that option will be presented (label) and also how it will be encoded in nanocube (byte value).

For example, if we want to have categories "NetType" and "Status" which will represent network type and status for some mobile call, then in Java we will usually use enum class, such as:
 public enum NetType {  
   GSM, GPRS, UMTS, EDGE  
 }  
 public enum Status {  
   SUCCEEDED, FAILED  
 }  
So you can define our category definition map something like:
 Map<String, Map<Object, CategoryEnumInfo>> categoryEnumerations = new HashMap<>();  
 categoryEnumerations.put("NetType", createEnumInfosFromEnumClass(NetType.class));  
 categoryEnumerations.put("Status", createEnumInfosFromEnumClass(Status.class));  
 ....  
 private <E extends Enum> Map<Object, CategoryEnumInfo> createEnumInfosFromEnumClass(Class<E> enumClass) {  
   Map<Object, CategoryEnumInfo> enumInfos = new HashMap<>();  
   for (E constant : enumClass.getEnumConstants()) {  
     enumInfos.put(constant, new CategoryEnumInfo(constant.name(), (byte) constant.ordinal()));  
   }  
   return enumInfos;  
 }  
So now when we have NanocubeDmpEncoder instance constructed, now we can encode the headers and records into required DMP format.
 // headers  
 byte[] headerBytes = dmpEncoder.encodeHeaders();  
 // record  
 Map<String, Object> categoryValues = new HashMap<>();  
 categoryValues.put("NetType", NetType.GSM);  
 categoryValues.put("Status", Status.SUCCEEDED);  
 byte[] recordBytes = dmpEncoder.encodeRecord(44.8789661034259, 17.72673345412568, new Date(), 1, categoryValues);  

In second part of this post we will see how to boot Nanocube server from Java process, and how to stream encoded data into it.

Friday, October 24, 2014

Packages and separation of concerns

Recently I had short discussion with my coworker about how to package java code, and it brought back some of my earlier thoughts about it, mostly because over time some of my opinions about the subject changed significantly.

Within a single code base, packages are highest-level way to modularize our code, so they play significant role when one wants to grasp what the code is all about. Thing is that, each class (or interface, or actually any other code artifact represented as a file), can often be looked upon from multiple different point of views, but since we have to place that class in one and only one package, we have to make decision what point of view (out of many) is most significant to us.

For example, in java community, DAOs (data access object) are common "type" of classes which encapsulate data access logic. Thus, we commonly have something like UserDao that contains logic to do CRUD operations on user entities. Actually, following good rule to separate interface from implementation, we most often have UserDao interface and something like HibernateUserDaoImpl implementation (in case Hibernate is used as our ORM framework) or maybe JdbcUserDaoImpl (in case plain JDBC is used).

However, this UserDao class can be viewed from 2 points of view:
  • it is DAO (technical point of view)
  • it is user-related (business domain point of view)
Maybe our app even have multiple deployments based on our customers, so we maybe even have something like AcmeJdbcUserDaoImpl, so beside 2 aspects mentioned above, we can even say for that class to have additional one - it is ACME-deployment-related functionality.

To keep things simple, let's just look at two-aspect version of this DAO. So basically we need to decide how will we package our code, whether by technical aspect (layering), such as:
, or by business aspect:

Actually, the latter approach isn't packaged by business domains in the fullest sense, but I'll explain that in a moment.

Earlier in my career I used former approach, but over time I mostly adopted later one. Maybe it's just a matter of preference, but my reasoning is as follows.

As we already mentioned, in both examples the code is structured based on some concern, technical or domain related, so it seems that in both cases some sense of order is kept there. But following the reasoning that Robert C. Martin gave when explaining single responsibility principle (close cousin to principles of coupling, cohesion and separation of concern), we should group pieces of code that are likely to change due to same reason.

Now, in our example of UserDao, if you frequently have to change DAO implementations in whole app due to desire to use different persistence framework for example, then you would certainly find useful if all DAOs are under one package. But I find these cases quite rare. On the other hand, if some business feature is changed, such as some which is related to user "module", and if for that reason you have to change User entity, as well as UserDao which persists it, and also UserManager that exposes service operation related to user functionality, then you should certainly look to group those code artifacts together. And in my experience, these kind of changes are constantly present.

I can see some other people already blogged about this kind of "vertical" modularization, so here is one example. It also goes kind of hand in hand with SOA/microservices architectures, where you identify pieces of your application from functional perspective, which when grown, can be separated into its own services, thus deployed separately.

Now, I said before that I cheated a bit regarding the second approach presented above. One can notice that I still have web layer separated, although I have user-related web functionality there which is out of "users" package. I guess I still consider web layer such a different concern from business layer that I like to keep it separated. Lot of times we just have changes to web layer that doesn't touch business side, thus this separatiuon. But it definitely got me thinking that it would maybe make nice experiment to just try once putting even UserWebController within "users" package because it often updated when some change is required to user-related functionality.

Another thing - this whole story reminded me of a moment in a past when I was frequently using Tapestry as a web framework of my choice. Common thing in web development back then, as it is also now, was to have HTML template files separated from web controller classes, usually under web app root directory. But I heard occasionally that some Tapestry users like to keep HTML templates under the classpath, in other words, placed within packages, together with other web related code. Thus, one would have something like:
I resisted this idea for some time because it felt so wrong to keep HTML files together with the code, but when I eventually gave it a try, I was enlightened. It was so much easier not having to jump through different project directories and search files whenever I had to change some web feature, and I almost always have to simultaneously change HTML template together with the related controller code.

Of course, if I had been working on some other type of project where I had some dedicated web designer to work solely on HTML templates, then it would be different situation so this way of packaging would probably be totally unsuitable, but since I was both the HTML designer as well as the coder, it really made sense.

You can also notice in picture above that localization messages (.properties files) are also placed within the package and separated for each HTML page (EditUser_en.properties, EditCompany_en.properties...). Although I never practiced this much, and was also using single localization file for all web messages as usual, it was also practiced by some people due to benefits of having related messages as close as possible to related web artifacts (page controller class, page HTML template) for easier maintenance.  Of course, the benefit of usual approach with single localization file is that it is certainly easier to translate all web messages to some new language if you have all of them placed in single file (web_messages_en.properties, web_messages_de.properties...). But even nowadays, whenever I work on some larger project, I stumble upon situations where these single localization message files contain much of old garbage messages. It is a garbage left due to exact reason that it is quite easy to forget to clean up these localization messages once some web stuff has been changed/removed. With the approach given above, you usually never forget to do that because these localization files are so close to file that you just changed/removed.

Saturday, August 16, 2014

Embedded Kafka and Zookeeper for unit testing

Recently I wanted to setup embedded Kafka cluster for my unit tests, and suprisingly it wasn't that trivial because most of examples I found around were made for some older versions of Kafka/Zookeeper or they didn't work for some other reasons, so it took me some time to find some proper version.

The project I took it from is Camus which is Kafka->Hadoop ETL project, and I just made some slight changes related to newer Zookeeper, as well as some changes configuration-wise.

My embedded Kafka & Zookeeper servers are available at this gist. All the code is tested against Kafka 0.8.1.1 and Zookeeper 3.4.6.

Here is simple example on how to setup Zookeeper at fixed port (2181), and 2 Kafka servers at random available ports:

     EmbeddedZookeeper embeddedZookeeper = new EmbeddedZookeeper(2181);  
     List<Integer> kafkaPorts = new ArrayList<Integer>();  
     // -1 for any available port  
     kafkaPorts.add(-1);  
     kafkaPorts.add(-1);  
     EmbeddedKafkaCluster embeddedKafkaCluster = new EmbeddedKafkaCluster(embeddedZookeeper.getConnection(), new Properties(), kafkaPorts);  
     embeddedZookeeper.startup();  
     System.out.println("### Embedded Zookeeper connection: " + embeddedZookeeper.getConnection());  
     embeddedKafkaCluster.startup();  
     System.out.println("### Embedded Kafka cluster broker list: " + embeddedKafkaCluster.getBrokerList());  
     Thread.sleep(10000);  
     embeddedKafkaCluster.shutdown();  
     embeddedZookeeper.shutdown();  


Thursday, June 12, 2014

Running Cascading Hadoop jobs via CLI, Oozie, or an IDE (part 2)

We are continuing now on the previous post our sample project setup that will allow us to run Cascading jobs within few usual environments - CLI, Oozie and IDE.

For our sample job, we'll take all too boring "word count" example. It would be best if we could code it in such a way to satisfy few requirements:
  • it should be triggered in the same way from any of 3 target environments
  • it should have externalized job parameters (such as "word count" input and output HDFS paths)
  • it should have externalized Hadoop configuration to be able to experiment with few of those
  • it should be able to take standard hadoop/yarn CLI command parameters
  • it should be able to take single input path parameter provided in form of multiple comma-separated paths for cases when Oozie coordinator takes multiple dataset instances during workflow submission
  • it should set custom mapreduce job name to improve job visibility on some GUIs (such as Hue)
Taking all these intro consideration, we decided to use recommended way to develop job "driver" applications for hadoop/yarn CLI commands - and that is using Hadoop Tool interface. That way it would be able to parse all standard parameters that these CLI commands provide, such as specifying different configuration file for job submission. 

Because Tool application is plain java application anyway, it can be also called from within an IDE. And finally, Oozie can also include it into its workflow as "java action node".

 public class WordCount extends Configured implements Tool {  
   public static void main(String[] args) throws Exception {  
     int exitCode = ToolRunner.run(new WordCount(), args);  
     System.exit(exitCode);  
   }  
   @Override  
   public int run(String[] args) throws Exception {  
     String inputPath = args[0];  
     String outputPath = args[1];  
     Configuration conf = getConf();  
     doWorkflow(inputPath, outputPath, conf);  
     return 0;  
   }  
 ...  
 }  

Job parameters - input and output path in this case, are provided as standard java program arguments. But as previously mentioned, when using Oozie, frequently an input argument is single String value containing comma-separated HDFS paths, because Oozie coordinator can be instructed to take multiple instances of some dataset and process them in batch, and it basically generates such comma-separated String value as input argument for triggered Oozie workflow. So its useful to construct source Tap from such String value. So here it goes:

   private Tap constructMultiHfsSourceTap(Scheme scheme, String inputPath) {  
     List<Tap> tapList = new ArrayList<Tap>();  
     String[] splits = inputPath.split(",");  
     for (String split : splits) {  
       tapList.add(new Hfs(scheme, split.trim()));  
     }  
     Tap[] taps = tapList.toArray(new Tap[tapList.size()]);  
     return new MultiSourceTap(taps);  
   }  

We couldn't use Cascading's GlobHfs here since individual paths are not part of some hierarchical structure.

Once this job hits the road, it would be great to easily see few main pieces of information about it on some kind of job UI, so we'll set its mapreduce job name:

 ....  
     Flow flow = flowConnector.connect(flowDef);  
     // set mapreduce job name  
     String mapReduceJobName = "Cascading Word Count: '" + inputPath + "' -> '" + outputPath + "'";  
     FlowStepStrategy flowStepStrategy = constructMapReduceJobNameStrategy(mapReduceJobName);  
     flow.setFlowStepStrategy(flowStepStrategy);  
     flow.complete();  
   }  
   private FlowStepStrategy constructMapReduceJobNameStrategy(final String mapReduceJobName) {  
     return new FlowStepStrategy() {  
       @Override  
       public void apply(Flow flow, List predecessorSteps, FlowStep flowStep) {  
         Object config = flowStep.getConfig();  
         if (config instanceof JobConf) {  
           ((JobConf) config).setJobName(mapReduceJobName);  
         }  
       }  
     };  
   }  

Complete sample "word count" project is avaiable on GitHub.

Job submission

IDE

Ok, now when we have all the code in place, we can finally run it locally within our IDE simply by calling the job driver application as any other java application. IDE will take care to put all necessary classes/jars on classpath.

 java -classpath <complete classpath here set by IDE> vmarcinko.cascading.wordcount.WordCount -conf conf/local-site.xml /home/vmarcinko/cascadingtest/sherlock_holmes.txt /home/vmarcinko/cascadingtest/wordcount  

As can be seen, we provided Hadoop configuration file (local-site.xml) by using "-conf" program argument which is standard argument parsed by ToolRunner utility class. In other words, we can use same standard arguments that can be used when submitting job via CLI as shown next.

Sample sherlock_holmes.txt file used in this example is available at <project dir>/data directory. If everything went good (and it should!), then word counts are found in part-xxxx file under output directory.

Command Line Interface (CLI)

First, we must package our job application in suitable form using Gradle Shadow plugin as described in part 1 of this post. The end result of "gradle shadowJar" task would be:
<cascading-wordcount project dir>/build/libs/cascading-wordcount-1.0-all.jar

Next we upload that JAR file into Hadoop cluster, place sample .txt file in HDFS path of our choice, and finally submit the job using shell command:

 yarn jar cascading-wordcount-1.0-all.jar vmarcinko.cascading.wordcount.WordCount /user/cloudera/cascadingtest/sherlock_holmes.txt /user/cloudera/cascadingtest/wordcount  

Shell command "yarn" is available in newer versions of Hadoop. Older version used "hadoop" command.

Oozie

To invoke the same vmarcinko.cascading.wordcount.WordCount application from Oozie, we need to use "java action node" within our Oozie workflow to launch it.

Anyway, we use the same shadow JAR (cascading-wordcount-1.0-all.jar) and place it under <oozie workflow HDFS dir>/lib directory. Under program arguments, it would be best to parametrize this java action node with ${inputPath} and ${outputPath}, so we can provide concrete HDFS paths when submitting the workflow.

When the job is launched via Oozie (either manually submitting workflow, or in scheduled manner via Oozie coordinator), we can see our running job nicely in some UI (such as Hue Job Browser in this example): Name of job corresponds to mapreduce job name that we set prior to execution.


(as usual when java application is called via Oozie's "java action node", for each such launch, Oozie initially starts a Map task that acts as launcher for specified java application, thus we end up with 2 jobs shown above)

I hope this post proves useful to all newbies trying to find some common way to set up Cascading job applications that can be triggered from various environments.