Move Files from Amazon S3 to HDFS Using Hortonworks DataFlow (HDF) / Apache NiFi

In continuation of our series on using Hortonworks DataFlow (HDF) – Powered by Apache NiFi – to design the DataFlow. HDF is a powerful and easy-to-use tool to distribute and process the data.

In our previous article we designed a simple DataFlow to move the data from a local storage to HDFS running on top of HDP. In case you want to catch up with Hortonworks DataFlow (HDF) or Apache NiFi, please checkout our previous article which provides a brief introduction on the technology.

Apache NiFi
Apache NiFi

In this article we will be designing a semi-complex DataFlow which will keep an eye on an AWS S3 bucket. Once the bucket has a new object we will grab it and put it in HDFS running on top of HDP.

There are number of ‘Processors’ in NiFi and this number is increasing. Apache NiFi includes the Processors which can be used to work with Amazon WebServices (AWS). At the time of this writing, the available Processors for AWS are for Amazon S3, Amazon SQS & Amazon SNS. This list *might* increase in future.

DataFlow Design Components

In order to design this DataFlow we will need certain specific Processors. Before we dig into the Processor we will select and configure, I would like to document the process that we will follow:

  • Have a Amazon S3 bucket where the content data will be available
  • Create a notification or queue in AWS. This will be used to send the events when a new object is added to the bucket.
  • Configure an ‘event’ in Amazon S3 bucket which will send the notification when a new object is created.
  • Move on to NiFi canvas and start configuring the Processors.
  • For my example I have configured a queue in Amazon SQS. I will be using this queue to receive all the events. NiFi will poll this queue and listen for the events. Yes, we will be using a Processor to poll Amazon SQS.

    If you prefer working with Amazon SNS you can use it in your flow.

    NiFi Processors

    For this example we will be using a bunch of different Processors. I will not walk through the configuration for each Processor. I will mention the important configuration for certain Processor which needs to be present or modified or set.

  • GetSQS – Fetches messages from an Amazon SQS. Since we are using Amazon SQS we will be using this Processor. You can choose the one available for Amazon SNS if you choose to use it.
  • SplitJSON – This Processor splits the JSON File into multiple, separate FlowFiles for an array element specified by JSON expression. Since the response from Queue will be in JSON format this will be an ideal processing for us.
  • ExtractText – This Processor will evaluate a regular expression against the content of a FlowFile. The results of those RegEx will be assigned to FlowFile Attributes. We could have used EvaluateRegularExpression but it is deprecated in 0.2.0.
  • FetchS3Object – Retrieves the content of an S3 Object and writes it to the content of a FlowFile.
  • UpdateAttribute – This Processor updates the attributes of a FlowFile using the properties or rules defined by the user.
  • PutHDFS – This Processor will write the FlowFiles to HDFS cluster defined by the user.
  • So here is how the Flow will be designed in NiFi:

    GetSQS => SplitJSON => ExtractText => FetchS3Object => UpdateAttribute => PutHDFS

    Now let’s get started with the implementation.

    Amazon Configuration

    For the sake of this DataFlow I have created a Amazon S3 bucket my-landing-bucket and a queue my-nifi-transfer in Amazon SQS. There are however, a few configuration which needs to be performed.

  • On Amazon SQS side I have kept the default configuration in regards to the Visibility Timeout and the Retention Period. The important configuration for the queue is related to the permissions. I have kept things simple. And opened the permissions on the queue. You can customize the permissions for your queue keeping in mind the security.
  • Here’s how my the permissions for my-nifi-transfer looks like for me.

    sqs-permission

    Again please note that this is highly insecure and NOT recommended. This is a temporary queue just for the sake of this demo.

  • Once the queue is ready it’s time to configure “Events” on the S3 bucket. Create an “Event” for the Amazon S3 which will send notification to the queue configured. The actual “Event” for which we need to configure the notification is ObjectCreated (All)
  • Below is how my my-landing-bucket notification event is configured.

    s3-events

    DataFlow Designing

    It’s time to start designing the DataFlow in Apache NiFi.

    Each ‘Processor’ has some of it’s basic details which needs to be filled in which will vary from one use-case to another. I will provide the critical information for each Processor for this demo.

    GetSQS

    You will have to make sure you provide the right Queue URL. Also make sure you have the right AWS credentials setup.

    SplitJSON

    Since the response from Amazon SQS will be in JSON format we will be using this Processor. We have to pull out the S3 Object Key information from the JSON response received in the previous Processor (GetSQS).

    Under “Properties” for this Processor set the value for Json Path to $.Records[*].s3.object.key.

    ExtractText

    We will be assigning a variable to the S3 Object Key information received previously. For this you will have to add a new property under “Properties” tab for this Processor.

    Add property with a ‘Property name’ as filename and value as (.*).

    FetchS3Object

    In this part we will be download the object from S3 bucket. Make sure that you configure the right values for AWS credentials and bucket information.

    Ensure that the value for Object Key is set to ${filename}

    UpdateAttribute

    The Processor PutHDFS seems to have issues with slahes (/). The S3 Object Key includes the slahes (/) in it’s path. We will be changing it to dash (-).

    Add new property with a ‘Property name’ as filename and value as ${filename:replaceAll(“/”, “-“)}.

    PutHDFS

    This will be the final Processor in the flow. Make sure you have the right values set based on your environment. And make sure you have the right value provided under Directory.

    In my case that happen to be /s3_files.

    DataFlow Final Design

    I have a screenshot for the design which should work for this DataFlow.

    nifi-final-design

    There are a few things related to relationships which you will have to deal with for each processor. I hope that the above screenshot gives you a good idea of all the different relationships I have branched out.

    Below are a few screenshots where you can see the files uploaded in Amazon S3 bucket which was moved to HDFS running on top of HDP.

    files-in-s3

    files-in-hdfs

    Conclusion

    As Apache NiFi continues to impress we can keep designing some easy-to-use and reliable DataFlows. It has potential to solve some really complex problems in terms of Data Ingestion, distribution and processing.

    What’s more is that you can save a DataFlow as a ‘template’ and export is in XML format. The ‘template’ can be imported in a different NiFi instance.

    I will be coming up a few more articles on Apache NiFi in coming weeks.

    1 Comment

    Post a Comment

    Time limit is exhausted. Please reload CAPTCHA.