Apache Nifi - MapD Data Import/Export Processors


#1

Hi All,

I have created a new Apache Nifi processor that ingests data into MapD.

There are three projects in total:

  1. A maven project that builds a thrift java client for MapD - https://github.com/jonrmayer/mapd_thrift_maven
  2. A maven project that builds a Nifi Controller service that exposes the MapD client and session details - https://github.com/jonrmayer/mapdservices-bundle
  3. A maven project that builds 2 Nifi Processors - https://github.com/jonrmayer/mapdprocessors-bundle
  • Imports data into MapD
  • Exports data from MapD

You will need to build them in the above order ( please see notes on each project )

Once built, you will need to place the following files into Apache Nifi’s lib folder prior to starting:-

  1. nifi-mapdservices-nar-1.0-SNAPSHOT.nar
  2. nifi-mapdservices-api-nar-1.0-SNAPSHOT.nar
  3. nifi-mapdprocessors-nar-1.0-SNAPSHOT.nar

In Nifi’s Designer - you will need to create a mapdservice controller with your connection details.

The PutMapDProcessor requires this service as well as the MapD tablename and a buffersize ( the avro batch sizes )
The GetMapDProcessor requires this service as well as the SQL Select Statement

This is a first pass and I need some help with reviewing MapDConversionUtilities.java for ineffeciencies/ optimization.

Also I would like to create a Spatial Ingest Processor but I am having problems with column mismatch for spatial types - all help gratefully received.

Any problems let me know here or on Github.

Regards,

Jonathan


#2

This is awesome work @jonrmayer, thanks for sharing! When you’ve worked out the bugs, we’d love to feature this on the MapD blog, if you’re willing to write a post describing your work. If you’re interested, please either send me a DM here on the board or contact me at randy.zwitch@mapd.com and we’ll figure out the details.

In regards to your question about MapDConversionUtilities.java, I’ll pass this link on to an internal engineer who can hopefully answer whatever questions you have.


#3

Hi @jonrmayer, Randy passed on a note about this project and specifically about your MapDConversionUtilities question. Looks very interesting and I plan to have a look at over the next day or so.

Jack


#4

Jonathan,

I got your code running over the weekend; successfully executing the test to store data in the mapd tables. It’s a nice solution. I think from your comments that you have developed on Windows, so it might interest you to know I have been running it on Centos7.

Firstly though, I should point out though that while the thrift api is relatively stable, it does change and this could break your code.

The adjustments to date and time stamps in your code look okay, but have you considered sending those fields as string type in the thrift binding. I had a look at how the jdbc bindings and it sends Date, Time and Timestamp as a string rather than an integer; letting the back end deal with converting to UTC. This would also remove some Date/Timestamp creation calls in the your client.

Given how problematic dates and times can be I did spend most of my time looking at their processing.

I didn’t get around to running your module from the Nifi console - but I hope to still give it go.

Also you do swallow an exception in PutMapDProcessor.java which was a little bit of a pain as the test table name in the code was TEST5, while in the doco it is TEST.

Thanks for the opportunity to look at your code.

Jack

Also I was wondering if you had thought of writing your Avro records to a kafka queue (in CSV format) an then harvesting them the mapd stream inserter. I’ve done something similar to stream records directly into a mapd database and it works very reliably


#5

Hi Jack,

Thanks for your efforts - especially over the weekend! My apologies for that test bug, it was resolved in another update to the code that I did on Sunday.

You will notice that there is another processor for extracting data out of MapD via an SQL statement. This was much easier to implement and I can successfully retrieve spatial data types as strings.

My set up is based upon a single node of mapd-os-cpu and I have been impressed with the throughputs that I am seeing. I was wondering whether the test results that you saw were comparable to other import tools that you have used?

Thanks for tips regarding date/timestamp calls - I will revise the code to take this into account. I have never worked with Kafka but I am sure that it is possible from Nifi ( there are tools for most tasks) - I will have a look.

I still need to sort out my spatial insert problem next and I was hoping to do it at the thrift java client level since I had been experiencing idiosyncrasies with the mapd jdbc driver.

Inserting spatial data via the jdbc statement seems to work (slowly) but prepared statements tend to fall over with “Wrong number of columns to load into Table” - not sure if the issues are in the jdbc code or the MapDHandler.

Do give the Nifi UI a go - it’s great from a Data Engineering perspective, Data Audits, Lineages etc - it makes my life so much easier:-)

Jonathan


#6

@jornmayer I was trying to build MapD thrift client from the repository you have shared. https://github.com/jonrmayer/mapd_thrift_maven

I am facing some issue to build the client. When I run mvn clean compile I see issues

Issue1:

[ERROR] thrift failed output:
[ERROR] thrift failed error: [ERROR:mapd_thrift_maven/src/main/resources/mapd.thrift:65] > > (last token was ‘TDatum’)
Type “TDatum” has not been defined.

[ERROR] thrift failed output:
[ERROR] thrift failed error: [ERROR:mapd_thrift_maven/src/main/resources/mapd.thrift:105] > > (last token was ‘TColumn’)
Type “TColumn” has not been defined.

I believe this issue is caused by circular dependencies included by structs in mapd.thrift. I also tried reimporting thift files from mapd-core but issue still persist.

Issue2
I just commented out few lines to remove circular dependency issue (just to make compiler happy :slight_smile: ) and check if it build goes ahead however I see other compilations issues:

[ERROR] Failed to execute goal org.apache.maven.plugins:maven-compiler-plugin:3.1:compile (default-compile) on project mapd_thrift_maven: Compilation failure: Compilation failure:
[ERROR] /home/abhishek/Documents/workspace/playground/kafka-mapd/mapd_thrift_maven/target/generated-sources/thrift/com/mapd/thrift/server/MapD.java:[2381,19] com.mapd.thrift.server.MapD.AsyncClient.disconnect_call is not abstract and does not override abstract method getResult() in org.apache.thrift.async.TAsyncMethodCall
[ERROR] /home/abhishek/Documents/workspace/playground/kafka-mapd/mapd_thrift_maven/target/generated-sources/thrift/com/mapd/thrift/server/MapD.java:[2396,19] getResult() in com.mapd.thrift.server.MapD.AsyncClient.disconnect_call cannot override getResult() in org.apache.thrift.async.TAsyncMethodCall
[ERROR] return type void is not compatible with java.lang.Object
[ERROR] /home/abhishek/Documents/workspace/playground/kafka-mapd/mapd_thrift_maven/target/generated-sources/thrift/com/mapd/thrift/server/MapD.java:[2800,19] com.mapd.thrift.server.MapD.AsyncClient.start_heap_profile_call is not abstract and does not override abstract method getResult() in org.apache.thrift.async.TAsyncMethodCall
[ERROR] /home/abhishek/Documents/workspace/playground/kafka-mapd/mapd_thrift_maven/target/generated-sources/thrift/com/mapd/thrift/server/MapD.java:[2815,19] getResult() in com.mapd.thrift.server.MapD.AsyncClient.start_heap_profile_call cannot override getResult() in org.apache.thrift.async.TAsyncMethodCall
[ERROR] return type void is not compatible with java.lang.Object
[ERROR] /home/abhishek/Documents/workspace/playground/kafka-mapd/mapd_thrift_maven/target/generated-sources/thrift/com/mapd/thrift/server/MapD.java:[2832,19] com.mapd.thrift.server.MapD.AsyncClient.stop_heap_profile_call is not abstract and does not override abstract method getResult() in org.apache.thrift.async.TAsyncMethodCall
[ERROR] /home/abhishek/Documents/workspace/playground/kafka-mapd/mapd_thrift_maven/target/generated-sources/thrift/com/mapd/thrift/server/MapD.java:[2847,19] getResult() in com.mapd.thrift.server.MapD.AsyncClient.stop_heap_profile_call cannot override getResult() in org.apache.thrift.async.TAsyncMethodCall
[ERROR] return type void is not compatible with java.lang.Object
[ERROR] /home/abhishek/Documents/workspace/playground/kafka-mapd/mapd_thrift_maven/target/generated-sources/thrift/com/mapd/thrift/server/MapD.java:[2931,19] com.mapd.thrift.server.MapD.AsyncClient.clear_cpu_memory_call is not abstract and does not override abstract method getResult() in org.apache.thrift.async.TAsyncMethodCall

PS: Some info about environment

thrift --version
Thrift version 0.9.1

cat /etc/lsb-release
DISTRIB_ID=Ubuntu
DISTRIB_RELEASE=18.04

java -version
java version “1.8.0_181”

Thanks. Appreciate your help