Sqoop Advanced

We have often seen WHERE $CONDITIONS in Sqoop and i personally haven’t known why this was used for until i recently found out what its use is.


Sqoop performs highly efficient data transfers by inheriting Hadoop’s parallelism. To help Sqoop split your query into multiple chunks that can be transferred in parallel, you need to include the $CONDITIONS placeholder in the where clause of your query. Sqoop will automatically substitute this placeholder with the generated conditions specifying which slice of data should be transferred by each individual task. While you could skip $CONDITIONS by forcing Sqoop to run only one job using the --num-mappers 1 parameter, such a limitation would have a severe performance impact.

Sqoop Performance Tuning Best Practices
Tune the following Sqoop arguments in JDBC connection or Sqoop mapping to optimize performance
  • num-mapper
  • batch
  • split-by and boundary-query
  • direct
  • fetch-size
Num-Mapper
 –num-mappers – Use n map tasks to import in parallel. Sqoop can imports data parallelly from database sources. You can specify the number of map tasks (parallel processes) to use to perform the import by using the -m or –num-mappers argument. Each of these arguments takes an integer value which corresponds to the number of parallelism to employ. By default, four tasks are used(default configuration).

To optimize performance, set the number of map tasks to a value lower than the maximum number of connections that the database supports and allowed for the user.

Uses: –num-mappers 10

Note: Controlling the amount of parallelism that Sqoop will use to transfer data is the main way to control the load on your database. Using more mappers will lead to a higher number of concurrent data transfer tasks, which can result in faster job completion. However, it will also increase the load on the database as Sqoop will execute more concurrent queries therefore calculate it carefully per connection provided and available network bandwidth.

 
Inserting Data in Batches
Specifies that you can group the related SQL statements into a batch when you export data with the --batch parameter.

Enable JDBC batching using the --batch parameter.
sqoop export  --connect <<JDBC URL>>  --username <<SQOOP_USER_NAME>>  --password <<SQOOP_PASSWOR>>   --table <<TABLE_NAME>>  --export-dir <<FOLDER_URI>>  --batch

The second option is to use the property sqoop.export.records.per.statement to specify the number of records that will be used in each insert statement:

    sqoop export 
    -Dsqoop.export.records.per.statement=10
    --connect <<JDBC URL>>   --username
    <<SQOOP_USER_NAME>>   --password
    <<SQOOP_PASSWORD>>   --table
    <<TABLE_NAME>>   --export-dir
    <<FOLDER_URI>>

Finally, you can set how many rows will be inserted per transaction with the sqoop.export.statements.per.transaction property:

    sqoop export 
    -Dsqoop.export.statements.per.transaction=10   --connect
    <<JDBC URL>>   --username
    <<SQOOP_USER_NAME>>   --password
    <<SQOOP_PASSWORD>>   --table
    <<TABLE_NAME>>   --export-dir
    <<FOLDER_URI>>

The default values can vary from connector to connector. Sqoop defaults to disabled batching and to 100 for both sqoop.export.records.per.statementand sqoop.export.statements.per.transactionproperties.
Custom Boundary Queries

To partition data into multiple executor that is going to use by different independent executor, Sqoop needs to find the minimum and maximum value of the column specified in the –split-by argument. In a table-based import, Sqoop uses the column specified with –split-by argument and generates the select query min(col), max(col) from tbl to identify the min and max boundary. In the case of the free-form query import, it will use the entire query specified on the command line as a subquery in place of the table name, resulting in a query select min(col), max(col) from (query). This is highly inefficient, as it requires materialization of the output result set prior to moving any data just for getting the import boundaries.

Specifies the range of values that you can import. You can use boundary-query if you do not get the desired results by using the split-by argument alone.

When you configure the boundary-query argument, you must specify the min(id) and max(id) along with the table name. If you do not configure the argument, Sqoop runs the following query.

    sqoop import   --connect
    <<JDBC URL>>   --username
    <<SQOOP_USER_NAME>>   --password
    <<SQOOP_PASSWORD>>   --query <<QUERY>>
      --split-by <<ID>>
      --target-dir <<TARGET_DIR_URI>>
     --boundary-query "select min(<<ID>>), max(<<ID>>)
    from <<TABLE>>"

2.Importing Data Directly into Hive

Specifies the direct import fast path when you import data from RDBMS.

Rather than using the JDBC interface for transferring data, the direct mode delegates the job of transferring data to the native utilities provided by the database vendor. In the case of MySQL, the mysqldump and mysqlimport will be used for retrieving data from the database server or moving data back. In the case of PostgreSQL, Sqoop will take advantage of the pg_dump utility to import data. Using native utilities will greatly improve performance, as they are optimized to provide the best possible transfer speed while putting less burden on the database server. There are several limitations that come with this faster import. For one, not all databases have available native utilities. This mode is not available for every supported database. Out of the box, Sqoop has direct support only for MySQL and PostgreSQL.

    sqoop import   --connect
    <<JDBC URL>>   --username
    <<SQOOP_USER_NAME>>   --password
    <<SQOOP_PASSWORD>>  --table
    <<TABLE_NAME>>   --direct

2.Importing Data using Fetch-size

Specifies the number of entries that Sqoop can import at a time.

Use the following syntax:

    --fetch-size=<n>

Where <n> represents the number of entries that Sqoop must fetch at a time. Default is 1000.

Increase the value of the fetch-size argument based on the volume of data that need to read. Set the value based on the available memory and bandwidth.

2.Controlling Parallelism

Specifies number of map tasks that can run in parallel. Default is 4. To optimize performance, set the number of map tasks to a value lower than the maximum number of connections that the database supports.

Use the parameter --num-mappers if you want Sqoop to use a different number of mappers. For example, to suggest 10 concurrent tasks, use the following Sqoop command:

    sqoop import   --connect
    jdbc:mysql://mysql.example.com/sqoop   --username
    sqoop   --password
    sqoop   --table
    cities   --num-mappers 10

Controlling the amount of parallelism that Sqoop will use to transfer data is the main way to control the load on your database. Using more mappers will lead to a higher number of concurrent data transfer tasks, which can result in faster job completion. However, it will also increase the load on the database as Sqoop will execute more concurrent queries.

2.Split-By

Specifies the column name based on which Sqoop must split the work units.

Use the following syntax:

    --split-by <column name>
    
    sqoop import   --connect
    <<JDBC URL>>   --username
    <<SQOOP_USER_NAME>>   --password
    <<SQOOP_PASSWORD>>   --query <<QUERY>>
      --split-by <<ID>>
    

    Note: If you do not specify a column name, Sqoop splits the
    work units based on the primary key.

Controlling Parallelism

Sqoop imports data in parallel from most database sources. You can specify the number of map tasks (parallel processes) to use to perform the import by using the -m or --num-mappers argument. Each of these arguments takes an integer value which corresponds to the degree of parallelism to employ. By default, four tasks are used. Some databases may see improved performance by increasing this value to 8 or 16. Do not increase the degree of parallelism greater than that available within your MapReduce cluster; tasks will run serially and will likely increase the amount of time required to perform the import. Likewise, do not increase the degree of parallism higher than that which your database can reasonably support. Connecting 100 concurrent clients to your database may increase the load on the database server to a point where performance suffers as a result.

When performing parallel imports, Sqoop needs a criterion by which it can split the workload. Sqoop uses a splitting column to split the workload. By default, Sqoop will identify the primary key column (if present) in a table and use it as the splitting column. The low and high values for the splitting column are retrieved from the database, and the map tasks operate on evenly-sized components of the total range. For example, if you had a table with a primary key column of id whose minimum value was 0 and maximum value was 1000, and Sqoop was directed to use 4 tasks, Sqoop would run four processes which each execute SQL statements of the form SELECT * FROM sometable WHERE id >= lo AND id < hi, with (lo, hi) set to (0, 250), (250, 500), (500, 750), and (750, 1001) in the different tasks.

If the actual values for the primary key are not uniformly distributed across its range, then this can result in unbalanced tasks. You should explicitly choose a different column with the --split-by argument. For example, --split-by employee_id. Sqoop cannot currently split on multi-column indices. If your table has no index column, or has a multi-column key, then you must also manually choose a splitting column.

If a table does not have a primary key defined and the --split-by <col> is not provided, then import will fail unless the number of mappers is explicitly set to one with the --num-mappers 1 option or the --autoreset-to-one-mapper option is used. The option --autoreset-to-one-mapper is typically used with the import-all-tables tool to automatically handle tables without a primary key in a schema.

No comments:

Post a Comment