Spark bigquery

Spark bigquery DEFAULT

Venturing into Data Science and deciding on a tool to use to solve a given problem can be challenging at times especially when you have a wide array of choices. In this age of data transformation where organizations are constantly seeking out ways to improve the day to day handling of data being produced and looking for methods to minimize the cost of having these operations, it has become imperative to handle such data transformations in the Cloud as it is a lot easier to manage and is also cost-efficient.

Data Warehousing architectures have rapidly changed over the years and most of the notable service providers are now Cloud-based. Therefore, companies are increasingly on the move to align with such offerings on the Cloud as it provides them with a lower upfront cost, enhances scalability, and performance as opposed to traditional On-premise Data Warehousing systems. Google BigQuery is among one of the well-known and widely accepted Cloud-based Data Warehouse Applications.

With the advent of Big Data, came up with Cloud applications like Hadoop and Spark. Both are developed by Apache Software Foundation, which are widely used Open-source frameworks for Big Data architectures. Each framework contains an extensive ecosystem of open-source technologies that prepare, process, manage and analyze big data sets. The BigQuery Connector is a library that allows Spark and Hadoop applications to analyze BigQuery data and write data to BigQuery using BigQuery’s native terminology.

In this article, you will gain information about Spark BigQuery Connector. You will also gain a holistic understanding of Google BigQuery, Apache Spark, Google Storage API, their key features and the steps to be followed to set up Spark BigQuery Connector. Read along to find out in-depth information about Spark BigQuery Connector.

Table of Contents

Introduction to Apache Spark

Spark BigQuery Connector -Apache Spark Logo

Apache Spark, created by a set of Ph.D. understudies at UC Berkeley in 2009, is a unified analytic tool containing multiple libraries for Big Data processing designed with distinctive Streaming Modules, Structured Query Language, Machine Learning, and Graph Handling. Simple APIs in Apache Spark can process significant information, while the end-users scarcely need to think about the task and resource management over machines, which is entirely done by Apache Spark in its engine.

Apache Spark is designed to work at a fast processing speed and perform general-purpose tasks. One of the main highlights of Apache Spark is its capacity to run computations of large Datasets in memory. Yet, the framework is likewise more proficient than MapReduce for complex apps running in memory.

Apache Spark covers a broad scope of workloads as a general-purpose tool that usually requires separate distributed systems. Spark makes it economical and straightforward to consolidate distinctive processing types by covering these workloads in a similar engine, which is essential for producing Data Analysis Pipelines.

To have further information about Apache Spark, follow the Official Documentation.

Key Features of Apache Spark

Spark BigQuery Connector - Features of Spark

Some of the key features of Apache Spark are as follows:

1) Performance

Apache Spark is well-known for its speed since it processes data in-memory (RAM). Apache Spark’s processing speed delivers near Real-Time Analytics, making it a suitable tool for IoT sensors, Credit Card Processing Systems, Marketing Campaigns, Security Analytics, Machine Learning, Social Media Sites, and Log Monitoring

2) Ease of Use

Apache Spark comes with in-built APIs for Scala, Java, and Python, and it also includes Spark SQL (formerly called Shark) for SQL users. Apache Spark also has simple building blocks, which makes it easy for users to write user-defined functions. You can use Apache Spark in interactive mode to get immediate feedback when running commands. 

3) Data Processing Capabilities

With Apache Spark, you can do more than just plain data processing. Apache Spark can process graphs and also comes with its own Machine Learning Library called MLlib. Due to its high-performance capabilities, you can use Apache Spark for Batch Processing as well as near Real-Time Processing. Apache Spark is a “one size fits all” platform that can be used to perform all tasks instead of splitting tasks across different platforms. 

4) Fault Tolerance

Apache Spark relies on speculative execution and retries for every task which relies on RAM. 

5) Security

Apache Spark has security set to “OFF” by default, which can make you vulnerable to attacks. Apache Spark supports authentication for RPC channels via a shared secret. It also supports event logging as a feature, and you can secure Web User Interfaces via Javax Servlet Filters. Additionally, since Apache Spark can run on Yarn and use HDFS features, it can use HDFS File Permissions, Kerberos Authentication, and encryption between nodes.

6) Scalability

Since Big Data keeps on growing, Cluster sizes should increase in order to maintain throughput expectations. Apache Spark offers scalability through HDFS. Apache Spark uses Random Access Memory (RAM) for optimal performance setup.

7) Cost

Apache Spark is an open-source platform, and it comes for free. However, you have to invest in hardware and personnel or outsource the development. This means you will incur the cost of hiring a team that is familiar with the Cluster administration, software and hardware purchases, and maintenance. 

Introduction to Google BigQuery

Spark BigQuery Connector - Google BigQuery

Google BigQuery is a Cloud-based Data Warehouse that provides a Big Data Analytic Web Service for processing petabytes of data. It is intended for analyzing data on a large scale. It consists of two distinct components: Storage and Query Processing. It employs the Dremel Query Engine to process queries and is built on the Colossus File System for storage. These two components are decoupled and can be scaled independently and on-demand.

Google BigQuery is fully managed by Cloud service providers. We don’t need to deploy any resources, such as discs or virtual machines. It is designed to process read-only data. Dremel and Google BigQuery use Columnar Storage for quick data scanning, as well as a tree architecture for executing queries using ANSI SQL and aggregating results across massive computer clusters. Furthermore, owing to its short deployment cycle and on-demand pricing, Google BigQuery is serverless and designed to be extremely scalable.

For further information about Google Bigquery, follow the Official Documentation.

Key Features of Google BigQuery

Spark BigQuery Connector - Features of BigQuery

Some of the key features of Google BigQuery are as follows:

1) Performance

Partitioning is supported by BigQuery, which improves Query performance. The data may be readily queried using SQL or Open Database Connectivity (ODBC)

2) Scalability

Being quite elastic, BigQuery separates computation and storage, allowing customers to scale processing and memory resources according to their needs. The tool has significant vertical and horizontal scalability and runs real-time queries on petabytes of data in a very short period.

3) Security

When a third-party authorization exists, users can utilize OAuth as a standard approach to get the cluster. By default, all data is encrypted and in transit. Cloud Identity and Access Management (IAM) allows for fine-tuning administration.

4) Usability

Google BigQuery is a highly user-friendly platform that requires a basic understanding of SQL commands, ETL tools, etc.

5) Data Types

 It supports JSON and XML file formats.

6) Data Loading

It employs the conventional ELT/ETL Batch Data Loading techniques by employing standard SQL dialect, as well as Data Streaming to load data row by row using Streaming APIs.

7) Integrations

In addition to operational databases, the system supports integration with a wide range of data integration tools, business intelligence (BI), and artificial intelligence (AI) solutions. It also works with Google Workspace and Cloud Platform.

8) Data Recovery

Data backup and disaster recovery are among the services provided by Google BigQuery. Users can query point-in-time snapshots of data changes from the last seven days.

9) Pricing Models

The Google BigQuery platform is available in both on-demand and flat-rate subscription models. Although data storage and querying will be chargedexporting, loading, and copying data is free. It has separated computational resources from storage resources. You are only charged when you run queries. The quantity of data processed during searches is billed.

A fully managed No-code Data Pipeline platform like Hevo Data helps you integrate and load data from 100+ different sources(including 30+ free sources) to a Data Warehouse such as Google BigQuery or Destination of your choice in real-time in an effortless manner. Hevo with its minimal learning curve can be set up in just a few minutes allowing the users to load data without having to compromise performance. Its strong integration with umpteenth sources allows users to bring in data of different kinds in a smooth fashion without having to code a single line. 

Get Started with Hevo for Free

Check out some of the cool features of Hevo:

  • Completely Automated: The Hevo platform can be set up in just a few minutes and requires minimal maintenance.
  • Transformations: Hevo provides preload transformations through Python code. It also allows you to run transformation code for each event in the Data Pipelines you set up. You need to edit the event object’s properties received in the transform method as a parameter to carry out the transformation. Hevo also offers drag and drop transformations like Date and Control Functions, JSON, and Event Manipulation to name a few. These can be configured and tested before putting them to use.
  • Connectors: Hevo supports 100+ integrations to SaaS platforms, files, Databases, analytics, and BI tools. It supports various destinations including Google BigQuery, Amazon Redshift, Snowflake Data Warehouses; Amazon S3 Data Lakes; and MySQL, SQL Server, TokuDB, DynamoDB, PostgreSQL Databases to name a few.  
  • Real-Time Data Transfer: Hevo provides real-time data migration, so you can have analysis-ready data always.
  • 100% Complete & Accurate Data Transfer: Hevo’s robust infrastructure ensures reliable data transfer with zero data loss.
  • Scalable Infrastructure: Hevo has in-built integrations for 100+ sources (including 30+ free sources) that can help you scale your data infrastructure as required.
  • 24/7 Live Support: The Hevo team is available round the clock to extend exceptional support to you through chat, email, and support calls.
  • Schema Management: Hevo takes away the tedious task of schema management & automatically detects the schema of incoming data and maps it to the destination schema.
  • Live Monitoring: Hevo allows you to monitor the data flow so you can check where your data is at a particular point in time.
Sign up here for a 14-Day Free Trial!

Understanding Apache Spark BigQuery Connector

Spark BigQuery Connector

The Spark BigQuery Connector is used with Apache Spark to read and write data from and to BigQuery. The connector can read Google BigQuery tables into Spark DataFrames and write DataFrames back to BigQuery. This is accomplished by communicating with BigQuery using the Spark SQL Data Source API.

The BigQuery Storage Read API streams data from BigQuery in parallel over gRPC without the need for Google Cloud Storage as an intermediary.

Key Features of BigQuery Storage Read API

Some of the key features of BigQuery Storage API are as follows:

1) Multiple Streams

Users can use the Storage Read API to read disjoint sets of rows from a table using multiple streams during a session. Consumption from distributed processing frameworks or independent consumer threads within a single client is facilitated by this.

2) Column Projection 

Users can choose an optional subset of columns to read while creating a session. When tables have a large number of columns, this allows for more efficient reads.

3) Column Filtering

Users can specify basic filter predicates to enable data filtration on the server side before transmitting it to a client.

4) Snapshot Consistency

Storage sessions are read using a snapshot isolation model. Every customer reads based on a specific point in time. The session creation time is used as the default snapshot time, although consumers can access data from an earlier snapshot.

For further information on Google BigQuery Storage Read API, follow the Official Documentation.

Requirements to Set up Spark BigQuery Connector

The requirements to be taken care of before moving forward with setting up Spark BigQuery Connector are as follows:

1) Enable the BigQuery Storage API

The Storage Read API is distinct from the BigQuery API and appears separately as the BigQuery Storage API in the Google Cloud Console. The Storage Read API, on the other hand, is enabled in all projects where the BigQuery API is enabled so, no further activation steps are required.

2) Create a Google Cloud Dataproc Cluster (Optional)

If you don’t have an Apache Spark environment, you can set up a Cloud Dataproc cluster with pre-configured authentication. Instead of  Cloud Dataproc,  may be used on any cluster.

The ‘BigQuery’ or ‘Cloud-platform’ scopes are required for every Dataproc cluster that uses the API. Dataproc clusters by default have the ‘BigQuery’ scope, therefore most clusters in enabled projects should work by default, for example.

Steps to Set Up Spark BigQuery Connector

The Spark BigQuery Connector uses the cross-language Spark SQL Data Source API.

The steps followed to set up Spark BigQuery Connector are as follows:

Step 1: Providing the Spark BigQuery Connector to your Application

The Spark BigQuery Connector must be available to your application at runtime. This can be achieved in one of the following ways:

  • Whenever you create your Cluster, install the Spark BigQuery Connector in the Spark jars directory of every node by using the Dataproc connectors initialization action.
  • You can add the Spark BigQuery Connector at runtime using the  parameter, which can be used with the Dataproc API or .
    • If you are using Dataproc image 1.5 and above, you can add the following parameter:
    • If you are using Dataproc image 1.4 or below, you can add the following parameter:
  • Include the jar in your Scala or Java Spark application as a dependency and can refer to Compiling against the Spark BigQuery Connector.

If the Spark BigQuery Connector is not available at runtime, a  is thrown.

For further information on Spark BigQuery Connector availability, visit here.

Step 2: Reading Data from a BigQuery Table

For reading data from a BigQuery table, you can refer to the following code blocks.

or the Scala only implicit API:

For more information on reading data from BigQuery tables, you can visit here.

Step 3: Reading Data from a BigQuery Query

The Spark BigQuery Connector lets you execute any Standard SQL SELECT query on BigQuery and have the results sent directly to a Spark Dataframe. This is simple to accomplish, as demonstrated by the following code sample:

And the above code yields the following result:

A second option is to use the Query option in the following way:

The execution is faster as only the result is transmitted over the wire. In a similar way, the queries can include JOINs more efficiently than running joins on Spark or use other BigQuery features such as Subqueries, BigQuery User-defined Functions, Wildcard Tables, BigQuery ML, etc.

In order to use this feature the following configurations MUST be set:

  • viewsEnabled” must be set to true.
  • materializationDataset” must be set to a dataset where the GCP user has table creation permission. “materializationProject” is optional.

Fur further information on reading data from BigQuery query, visit here.

Step 4: Writing Data to BigQuery

Writing a DataFrame to BigQuery is done in similar ways as above. You can observe that the process first uploads the data to GCS before loading it into BigQuery; a GCS bucket must be created to specify the temporary data placement.

The data is stored temporarily in the Apache parquet format. Apache ORC is an alternative format.

The GCS bucket and the format can also be set globally using Spark”s RuntimeConfig in the following manner:

While streaming a DataFrame to BigQuery, each batch is written in the same way as a non-streaming DataFrame.

Note that an HDFS compatible checkpoint location (eg:  or ) must be specified.

To have further information on writing data to Google BigQuery, visit here.

Conclusion

In this article, you have learned about Google BigQuery, Apache Spark, and their key features. This article also provided information on Spark BigQuery Connector, BigQuery Storage Read API and the steps followed to set up Spark BigQuery Connector.

Hevo Data, a No-code Data Pipeline provides you with a consistent and reliable solution to manage data transfer between a variety of sources and a wide variety of Desired Destinations with a few clicks.

Visit our Website to Explore Hevo

Hevo Data with its strong integration with 100+ data sources (including 30+ free sources) allows you to not only export data from your desired data sources & load it to the destination of your choice such as Google BigQuery, but also transform & enrich your data to make it analysis-ready so that you can focus on your key business needs and perform insightful analysis using BI tools. 

Sign Upfor a 14-day free trial and experience the feature-rich Hevo suite first hand. You may also have a look at the amazing price, which will assist you in selecting the best plan for your requirements.

Share your experience of understanding the Spark BigQuery Connector in the comment section below! We would love to hear your thoughts.

Sours: https://hevodata.com/learn/spark-bigquery-connector/

spark-bigquery: A Google BigQuery Data Source for Apache Spark

This project provides a Google BigQuery data source () to Apache Spark using the new Google Cloud client libraries for the Google BigQuery API. It supports "direct" import/export where records are directly streamed from/to BigQuery. In addition, data may be imported/exported via intermediate data extracts on Google Cloud Storage (GCS). Note that when using "direct" (streaming) export, data may not be immediately available for further querying/processing in BigQuery. It may take several minutes for streamed records to become "available". See the following resources for more information:

The following import/export combinations are currently supported:

DirectParquetAvroORCJSONCSV
Import to Spark (export from BigQuery):heavy_check_mark::x::heavy_check_mark::x::heavy_check_mark::heavy_check_mark:
Export from Spark (import to BigQuery):heavy_check_mark::heavy_check_mark::heavy_check_mark::heavy_check_mark::x::x:

More information on the various supported formats can be found at:

CSV and JSON are not recommended as data exchange formats between Apache Spark and BigQuery due to their lack of type safety. Better options are direct import/export, Parquet, Avro and ORC.

This data source is used in the sparkbq R package.

Building

Due to dependency version mismatches between Apache Spark and Google client libraries (e.g. Google Guava) this project uses to build a fat JAR using shading to relocate relevant Google classes.

Version Information

The following table provides an overview over supported versions of Apache Spark, Scala and Google Dataproc:

spark-bigquerySparkScalaGoogle Dataproc
0.1.x2.2.x and 2.3.x2.111.2.x and 1.3.x

Example

The provided Google BigQuery data source () can be used as follows:

You can find a complete example at .

To run this example first compile and assembly using . Then run:

Local Spark Cluster

Google Cloud Dataproc

Login to service account (it needs to have permissions to access all resources):

Run spark job:

where are:

  1. Google BigQuery billing project ID
  2. Google BigQuery dataset location (EU, US)
  3. Google Cloud Storage (GCS) bucket where staging files will be located

Using the spark-bigquery Spark package

spark-bigquery is available as Spark package from https://spark-packages.org/package/miraisolutions/spark-bigquery and as such via the Maven coordinates . You can simply specify the appropriate Maven coordinates with the option when using the Spark shell or when using .

Using the Spark Shell

Using PySpark

Using

Assume the following Spark application which has been compiled into a JAR named (you may want to use something like Holden Karau's Giter8 Spark project template for this):

You can run this application using in the following way:

Using

Login to service account (it needs to have permissions to access all resources):

Similar to the example above, the Spark application can be submitted to Google Dataproc using

You may choose not to specify a service account key file and use default application credentials instead when running on Dataproc.

Configuration

The three main Spark read/write options include:

  • : The BigQuery table to read/write. To be specified in the form . One of or must be specified.
  • : A SQL query in Google BigQuery standard SQL dialect (SQL-2011). One of or must be specified.
  • (optional): The BigQuery import/export type to use. Options include "direct", "parquet", "avro", "orc", "json" and "csv". Defaults to "direct". See the table at the top for supported type and import/export combinations.

In addition, there are a number of BigQuery configuration options that can be specified in two ways: the traditional way using Spark's read/write options (e.g. ) and using the extension method (; see example above) which is usually more straightforward to use. If you prefer the traditional way or if you are using spark-bigquery in a non-Scala environment (e.g. PySpark), the configuration keys are as follows:

  • (required): Google BigQuery billing project id
  • (required): Geographic location where newly created datasets should reside. "EU" or "US".
  • (optional): Google Cloud service account key file to use for authentication with Google Cloud services. The use of service accounts is highly recommended. Specifically, the service account will be used to interact with BigQuery and Google Cloud Storage (GCS). If not specified, application default credentials will be used.
  • (optional): Prefix of BigQuery staging dataset. A staging dataset is used to temporarily store the results of SQL queries. Defaults to "spark_staging".
  • (optional): Default staging dataset table lifetime in milliseconds. Tables are automatically deleted once the lifetime has been reached. Defaults to 86400000 ms (= 1 day).
  • (required): Google Cloud Storage (GCS) bucket to use for storing temporary files. Temporary files are used when importing through BigQuery load jobs and exporting through BigQuery extraction jobs (i.e. when using data extracts such as Parquet, Avro, ORC, ...). The service account specified in needs to be given appropriate rights.
  • (optional): BigQuery job priority when executing SQL queries. Options include "interactive" and "batch". Defaults to "interactive", i.e. the query is executed as soon as possible.
  • (optional): Timeout in milliseconds after which a file import/export job should be considered as failed. Defaults to 3600000 ms (= 1 h).

See the following resources for more information:

Schema Conversion

Using Direct Mode

In direct (streaming) mode, spark-bigquery performs the following data type conversions between supported Spark data types and BigQuery data types:

Importing to Spark / Exporting from BigQuery

BigQuery Source Data TypeSpark Target Data Type
BOOLBooleanType
INT64LongType
FLOAT64DoubleType
NUMERICDecimalType(38, 9)
STRINGStringType
BYTESBinaryType
STRUCTStructType
TIMESTAMPTimestampType
DATEDateType
TIMEStringType
DATETIMEStringType

BigQuery repeated fields are mapped to the corresponding Spark ArrayType. Also, BigQuery's nullable mode is used to determine the appropriate nullable property of the target Spark data type.

Exporting from Spark / Importing to BigQuery

Spark Source Data TypeBigQuery Target Data Type
BooleanTypeBOOL
ByteTypeINT64
ShortTypeINT64
IntegerTypeINT64
LongTypeINT64
FloatTypeFLOAT64
DoubleTypeFLOAT64
<= DecimalType(38, 9)NUMERIC
> DecimalType(38, 9)STRING
StringTypeSTRING
BinaryTypeBYTES
StructTypeSTRUCT
TimestampTypeTIMESTAMP
DateTypeDATE
MapTypeRepeated key-value STRUCT
ArrayTypeRepeated field
OtherSTRING

For more information on supported BigQuery data types see https://cloud.google.com/bigquery/docs/reference/standard-sql/data-types.

When using intermediate GCS data extracts (Parquet, Avro, ORC, ...) the result depends on the data format being used. Consult the data format's specification for information on supported data types. Furthermore, see the following resources on type conversions supported by BigQuery:

Authentication

Providing key file is only possible in local cluster mode, since app deployed on GC will try to resolve a location on HDFS. It's not a good practice to keep key files stored as cloud resource.

If you need to run via gcloud you can authenticate with service account JSON file using:

Using local cluster mode it is possible to provide the key file as an argument to the spark job.

Information on how to generate service account credentials can be found at https://cloud.google.com/storage/docs/authentication#service_accounts. The service account key file can either be passed directly via or it can be passed through an environment variable: (see https://cloud.google.com/docs/authentication/getting-started for more information). When running on Google Cloud, e.g. Google Cloud Dataproc, application default credentials may be used in which case it is not necessary to specify a service account key file.

License

MIT License

Copyright (c) 2018 Mirai Solutions GmbH

Permission is hereby granted, free of charge, to any person obtaining a copy of this software and associated documentation files (the "Software"), to deal in the Software without restriction, including without limitation the rights to use, copy, modify, merge, publish, distribute, sublicense, and/or sell copies of the Software, and to permit persons to whom the Software is furnished to do so, subject to the following conditions:

The above copyright notice and this permission notice shall be included in all copies or substantial portions of the Software.

THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE.

Sours: https://opensourcelibs.com/lib/spark-bigquery
  1. Single phase 120v vfd
  2. Advantage jet boats
  3. Baby loss frame
  4. Topical neomycin

tfayyaz/spark-bigquery-connector

The connector supports reading Google BigQuery tables into Spark's DataFrames, and writing DataFrames back into BigQuery. This is done by using the Spark SQL Data Source API to communicate with BigQuery.

Beta Disclaimer

The BigQuery Storage API and this connector are in Beta and are subject to change.

Changes may include, but are not limited to:

  • Type conversion
  • Partitioning
  • Parameters

Breaking changes will be restricted to major and minor versions.

BigQuery Storage API

The Storage API streams data in parallel directly from BigQuery via gRPC without using Google Cloud Storage as an intermediary.

It has a number of advantages over using the previous export-based read flow that should generally lead to better read performance:

Direct Streaming

It does not leave any temporary files in Google Cloud Storage. Rows are read directly from BigQuery servers using an Avro wire format.

Filtering

The new API allows column and predicate filtering to only read the data you are interested in.

Column Filtering

Since BigQuery is backed by a columnar datastore, it can efficiently stream data without reading all columns.

Predicate Filtering

The Storage API supports arbitrary pushdown of predicate filters. Connector version 0.8.0-beta and above support pushdown of arbitrary filters to Bigquery.

There is a known issue in Spark that does not allow pushdown of filters on nested fields. For example - filters like will not get pushdown to Bigquery.

Dynamic Sharding

The API rebalances records between readers until they all complete. This means that all Map phases will finish nearly concurrently. See this blog article on how dynamic sharding is similarly used in Google Cloud Dataflow.

See Configuring Partitioning for more details.

Requirements

Enable the BigQuery Storage API

Follow these instructions.

Create a Google Cloud Dataproc cluster (Optional)

If you do not have an Apache Spark environment you can create a Cloud Dataproc cluster with pre-configured auth. The following examples assume you are using Cloud Dataproc, but you can use on any cluster.

Any Dataproc cluster using the API needs the 'bigquery' or 'cloud-platform' scopes. Dataproc clusters have the 'bigquery' scope by default, so most clusters in enabled projects should work by default e.g.

Downloading and Using the Connector

The latest version connector of the connector is publicly available in gs://spark-lib/bigquery/spark-bigquery-latest.jar. A Scala 2.12 compiled version exist in gs://spark-lib/bigquery/spark-bigquery-latest_2.12.jar.

The connector is also available from the Maven Central repository. It can be used using the option or the configuration property. Use the following value

Scala versionConnector Artifact
Scala 2.11
Scala 2.12

Hello World Example

You can run a simple PySpark wordcount against the API without compilation by running

Example Codelab

https://codelabs.developers.google.com/codelabs/pyspark-bigquery

Usage

The connector uses the cross language Spark SQL Data Source API:

Reading data from BigQuery

or the Scala only implicit API:

See Shakespeare.scala and shakespeare.py for more information.

Writing data to BigQuery

Writing a DataFrame to BigQuery is done in a similar manner. Notice that the process writes the data first to GCS and then loads it to BigQuery, a GCS bucket must be configured to indicate the temporary data location.

The data is temporarily stored using the Apache parquet format. An alternative format is Apache ORC.

The GCS bucket and the format can also be set globally using Spark"s RuntimeConfig like this:

Inportant: The connector does not configure the GCS connector, in order to avoid conflict with another GCS connector, if exists. In order to use the write capabilities of the connector, please configure the GCS connector on your cluster as explained here.

Properties

The API Supports a number of options to configure the read

PropertyMeaningUsage
The BigQuery table in the format . (Required)Read/Write
The dataset containing the table.
(Optional unless omitted in )
Read/Write
The Google Cloud Project ID of the table.
(Optional. Defaults to the project of the Service Account being used)
Read/Write
The Google Cloud Project ID of the table to bill for the export.
(Optional. Defaults to the project of the Service Account being used)
Read/Write
The maximal number of partitions to split the data into. Actual number may be less if BigQuery deems the data small enough. If there are not enough executors to schedule a reader per partition, some partitions may be empty.
Important: The old parameter () is still supported but in deprecated mode. It will ve removed in version 1.0 of the connector.
(Optional. Defaults to one partition per 400MB. See Configuring Partitioning.)
Read
Enables the connector to read from views and not only tables. Please read the relevant section before activating this option.
(Optional. Defaults to )
Read
The project id where the materialized view is going to be created
(Optional. Defaults to view's project id)
Read
The dataset where the materialized view is going to be created
(Optional. Defaults to view's dataset)
Read
The connector uses an optimized empty projection (select without any columns) logic, used for execution. This logic takes the data directly from the table metadata or performs a much efficient `SELECT COUNT(*) WHERE...` in case there is a filter. You can cancel the use of this logic byt setting this option to .
(Optional, defaults to )
Read
Specifies whether the job is allowed to create new tables. The permitted values are:
  • - Configures the job to create the table if it does not exist.
  • - Configures the job to fail if the table does not exist.
This option takes place only in case Spark has decided to write data to the table based on the SaveMode.
(Optional. Default to CREATE_IF_NEEDED).
Write
The GCS bucket that temporarily holds the data before it is loaded to BigQuery. Required unless set in the Spark configuration (). Write
The format of the data before it is loaded to BigQuery, values can be either "parquet" or "orc".
(Optional. Defaults to ). On write only.
Write
If not set, the table is partitioned by pseudo column, referenced via either type, or type. If field is specified, the table is instead partitioned by this field. The field must be a top-level TIMESTAMP or DATE field. Its mode must be NULLABLE or REQUIRED.
(Optional).
Write
Number of milliseconds for which to keep the storage for partitions in the table. The storage in a partition will have an expiration time of its partition time plus this value.
(Optional).
Write
The only type supported is DAY, which will generate one partition per day.
(Optional. Default to DAY).
Write

Data types

With the exception of and all BigQuery data types directed map into the corresponding Spark SQL data type. Here are all of the mappings:

BigQuery Standard SQL Data Type Spark SQL

Data Type

Notes
This preserves 's full 38 digits of precision and 9 digits of scope.
Spark has no DATETIME type. Casting to TIMESTAMP uses a configured TimeZone, which defaults to the local timezone (UTC in GCE / Dataproc).

We are considering adding an optional TimeZone property to allow automatically converting to TimeStamp, this would be consistent with Spark's handling of CSV/JSON (except they always try to convert when inferring schema, and default to the local timezone)

Spark has no TIME type. The generated longs, which indicate microseconds since midnight can be safely cast to TimestampType, but this causes the date to be inferred as the current day. Thus times are left as longs and user can cast if they like.

When casting to Timestamp TIME have the same TimeZone issues as DATETIME

Filtering

The connector automatically computes column and pushdown filters the DataFrame's statement e.g.

filters to the column and pushed down the predicate filter .

If you do not wish to make multiple read requests to BigQuery, you can cache the DataFrame before filtering e.g.:

You can also manually specify the option, which will override automatic pushdown and Spark will do the rest of the filtering in the client.

Partitioned Tables

The pseudo columns _PARTITIONDATE and _PARTITIONTIME are not part of the table schema. Therefore in order to query by the partitions of partitioned tables do not use the where() method shown above. Instead, add a filter option in the following manner:

Configuring Partitioning

By default the connector creates one partition per 400MB in the table being read (before filtering). This should roughly correspond to the maximum number of readers supported by the BigQuery Storage API. This can be configured explicitly with the property. BigQuery may limit the number of partitions based on server constraints.

Reading From Views

The connector has a preliminary support for reading from BigQuery views. Please note there are a few caveats:

  • BigQuery views are not materialized by default, which means that the connector needs to materialize them before it can read them. This process affects the read performance, even before running any or action.
  • The materialization process can also incur additional costs to your BigQuery bill.
  • By default, the materialized views are created in the same project and dataset. Those can be configured by the optional and options, respectively. These options can also be globally set by calling before reading the views.
  • Reading from views is disabled by default. In order to enable it, either set the viewsEnabled option when reading the specific view () or set it globally by calling .

Using in Jupyter Notebooks

The connector can be used in Jupyter notebooks even if it is not installed on the Spark cluster. It can be added as an external jar in using the following code:

Python:

frompyspark.sqlimportSparkSessionspark=SparkSession.builder\ .config("spark.jars.packages", "com.google.cloud.spark:spark-bigquery-with-dependencies_2.11:0.13.1-beta")\ .getOrCreate() df=spark.read.format("bigquery")\ .option("table","dataset.table")\ .load()

Scala:

valspark=SparkSession.builder .config("spark.jars.packages", "com.google.cloud.spark:spark-bigquery-with-dependencies_2.11:0.13.1-beta") .getOrCreate() valdf=spark.read.format("bigquery") .option("table","dataset.table") .load()

In case Spark cluster is using Scala 2.12 (it's optional for Spark 2.4.x, mandatory in 3.0.x), then the relevant package is com.google.cloud.spark:spark-bigquery-with-dependencies_2.12:0.13.1-beta. In order to know which Scala version is used, please run the following code:

Python:

spark.sparkContext._jvm.scala.util.Properties.versionString()

Scala:

scala.util.Properties.versionString

Compiling against the connector

Unless you wish to use the implicit Scala API , there is no need to compile against the connector.

To include the connector in your project:

Maven

<dependency> <groupId>com.google.cloud.spark</groupId> <artifactId>spark-bigquery-with-dependencies_${scala.version}</artifactId> <version>0.13.1-beta</version> </dependency>

SBT

libraryDependencies +="com.google.cloud.spark"%%"spark-bigquery-with-dependencies"%"0.13.1-beta"

Building the Connector

The connector is built using SBT. Following command creates a jar with shaded dependencies:

FAQ

What is the Pricing for the Storage API?

See the BigQuery pricing documentation.

I have very few partitions

You can manually set the number of partitions with the property. BigQuery may provide fewer partitions than you ask for. See Configuring Partitioning.

You can also always repartition after reading in Spark.

How do I authenticate outside GCE / Dataproc?

Use a service account JSON key and as described here.

Credentials can also be provided explicitly either as a parameter or from Spark runtime configuration. It can be passed in as a base64-encoded string directly, or a file path that contains the credentials (but not both).

Example:

or

Alternatively, specify the credentials file name.

or

Sours: https://githubmemory.com/repo/tfayyaz/spark-bigquery-connector
Loading data into BigQuery

Apache Spark SQL connector for Google BigQuery (Beta)

The connector supports reading Google BigQuery tables into Spark's DataFrames, and writing DataFrames back into BigQuery. This is done by using the Spark SQL Data Source API to communicate with BigQuery.

Beta Disclaimer

The BigQuery Storage API and this connector are in Beta and are subject to change.

Changes may include, but are not limited to:

  • Type conversion
  • Partitioning
  • Parameters

Breaking changes will be restricted to major and minor versions.

BigQuery Storage API

The Storage API streams data in parallel directly from BigQuery via gRPC without using Google Cloud Storage as an intermediary.

It has a number of advantages over using the previous export-based read flow that should generally lead to better read performance:

Direct Streaming

It does not leave any temporary files in Google Cloud Storage. Rows are read directly from BigQuery servers using the Arrow or Avro wire formats.

Filtering

The new API allows column and predicate filtering to only read the data you are interested in.

Column Filtering

Since BigQuery is backed by a columnar datastore, it can efficiently stream data without reading all columns.

Predicate Filtering

The Storage API supports arbitrary pushdown of predicate filters. Connector version 0.8.0-beta and above support pushdown of arbitrary filters to Bigquery.

There is a known issue in Spark that does not allow pushdown of filters on nested fields. For example - filters like will not get pushdown to Bigquery.

Dynamic Sharding

The API rebalances records between readers until they all complete. This means that all Map phases will finish nearly concurrently. See this blog article on how dynamic sharding is similarly used in Google Cloud Dataflow.

See Configuring Partitioning for more details.

Requirements

Enable the BigQuery Storage API

Follow these instructions.

Create a Google Cloud Dataproc cluster (Optional)

If you do not have an Apache Spark environment you can create a Cloud Dataproc cluster with pre-configured auth. The following examples assume you are using Cloud Dataproc, but you can use on any cluster.

Any Dataproc cluster using the API needs the 'bigquery' or 'cloud-platform' scopes. Dataproc clusters have the 'bigquery' scope by default, so most clusters in enabled projects should work by default e.g.

Downloading and Using the Connector

The latest version of the connector is publicly available in the following links:

versionLink
Scala 2.11 (HTTP link)
Scala 2.12 (HTTP link)

The connector is also available from the Maven Central repository. It can be used using the option or the configuration property. Use the following value

versionConnector Artifact
Scala 2.11
Scala 2.12

If you want to keep up with the latest version of the connector the following links can be used. Notice that for production environments where the connector version should be pinned, one of the above links should be used.

versionLink
Scala 2.11 (HTTP link)
Scala 2.12 (HTTP link)

Hello World Example

You can run a simple PySpark wordcount against the API without compilation by running

Dataproc image 1.5 and above

Dataproc image 1.4 and below

Example Codelab

https://codelabs.developers.google.com/codelabs/pyspark-bigquery

Usage

The connector uses the cross language Spark SQL Data Source API:

Reading data from a BigQuery table

or the Scala only implicit API:

For more information, see additional code samples in Python, Scala and Java.

Reading data from a BigQuery query

The connector allows you to run any Standard SQL SELECT query on BigQuery and fetch its results directly to a Spark Dataframe. This is easily done as described in the following code sample:

Which yields the result

A second option is to use the option like this:

Notice that the execution should be faster as only the result is transmitted over the wire. In a similar fashion the queries can include JOINs more efficiently then running joins on Spark or use other BigQuery features such as subqueries, BigQuery user defined functions, wildcard tables, BigQuery ML and more.

In order to use this feature the following configurations MUST be set:

  • must be set to .
  • must be set to a dataset where the GCP user has table creation permission. is optional.

Note: As mentioned in the BigQuery documentation, the queried tables must be in the same location as the . Also, if the tables in the are from projects other than the then use the fully qualified table name i.e. .

Important: This feature is implemented by running the query on BigQuery and saving the result into a temporary table, of which Spark will read the results from. This may add additional costs on your BigQuery account.

Reading From Views

The connector has a preliminary support for reading from BigQuery views. Please note there are a few caveats:

  • BigQuery views are not materialized by default, which means that the connector needs to materialize them before it can read them. This process affects the read performance, even before running any or action.
  • The materialization process can also incur additional costs to your BigQuery bill.
  • By default, the materialized views are created in the same project and dataset. Those can be configured by the optional and options, respectively. These options can also be globally set by calling before reading the views.
  • Reading from views is disabled by default. In order to enable it, either set the viewsEnabled option when reading the specific view () or set it globally by calling .
  • As mentioned in the BigQuery documentation, the should be in same location as the view.

Writing data to BigQuery

Writing a DataFrame to BigQuery is done in a similar manner. Notice that the process writes the data first to GCS and then loads it to BigQuery, a GCS bucket must be configured to indicate the temporary data location.

The data is temporarily stored using the Apache parquet format. An alternative format is Apache ORC.

The GCS bucket and the format can also be set globally using Spark"s RuntimeConfig like this:

When streaming a DataFrame to BigQuery, each batch is written in the same manner as a non-streaming DataFrame. Note that a HDFS compatible checkpoint location (eg: or ) must be specified.

Important: The connector does not configure the GCS connector, in order to avoid conflict with another GCS connector, if exists. In order to use the write capabilities of the connector, please configure the GCS connector on your cluster as explained here.

Properties

The API Supports a number of options to configure the read

PropertyMeaningUsage
The BigQuery table in the format . It is recommended to use the parameter of / instead. This option has been deprecated and will be removed in a future version.
(Deprecated)
Read/Write
The dataset containing the table. This option should be used with standard table and views, but not when loading query results.
(Optional unless omitted in )
Read/Write
The Google Cloud Project ID of the table. This option should be used with standard table and views, but not when loading query results.
(Optional. Defaults to the project of the Service Account being used)
Read/Write
The Google Cloud Project ID of the table to bill for the export.
(Optional. Defaults to the project of the Service Account being used)
Read/Write
The maximal number of partitions to split the data into. Actual number may be less if BigQuery deems the data small enough. If there are not enough executors to schedule a reader per partition, some partitions may be empty.
Important: The old parameter () is still supported but in deprecated mode. It will ve removed in version 1.0 of the connector.
(Optional. Defaults to one partition per 400MB. See Configuring Partitioning.)
Read
Enables the connector to read from views and not only tables. Please read the relevant section before activating this option.
(Optional. Defaults to )
Read
The project id where the materialized view is going to be created
(Optional. Defaults to view's project id)
Read
The dataset where the materialized view is going to be created. This dataset should be in same location as the view or the queried tables.
(Optional. Defaults to view's dataset)
Read
The expiration time of the temporary table holding the materialized data of a view or a query, in minutes. Notice that the connector may re-use the temporary table due to the use of local cache and in order to reduce BigQuery computation, so very low values may cause errors. The value must be a positive integer.
(Optional. Defaults to 1440, or 24 hours)
Read
Data Format for reading from BigQuery. Options : , Unsupported Arrow filters are not pushed down and results are filtered later by Spark. (Currently Arrow does not suport disjunction across columns).
(Optional. Defaults to )
Read
The connector uses an optimized empty projection (select without any columns) logic, used for execution. This logic takes the data directly from the table metadata or performs a much efficient `SELECT COUNT(*) WHERE...` in case there is a filter. You can cancel the use of this logic byt setting this option to .
(Optional, defaults to )
Read
If set to , the connector pushes all the filters Spark can delegate to BigQuery Storage API. This reduces amount of data that needs to be sent from BigQuery Storage API servers to Spark clients.
(Optional, defaults to )
Read
Specifies whether the job is allowed to create new tables. The permitted values are:
  • - Configures the job to create the table if it does not exist.
  • - Configures the job to fail if the table does not exist.
This option takes place only in case Spark has decided to write data to the table based on the SaveMode.
(Optional. Default to CREATE_IF_NEEDED).
Write
The GCS bucket that temporarily holds the data before it is loaded to BigQuery. Required unless set in the Spark configuration (). Write
The GCS bucket that holds the data before it is loaded to BigQuery. If informed, the data won't be deleted after write data into BigQuery. Write
The GCS path that holds the data before it is loaded to BigQuery. Used only with . Write
The format of the data before it is loaded to BigQuery, values can be either "parquet","orc" or "avro". In order to use the Avro format, the spark-avro package must be added in runtime.
(Optional. Defaults to ). On write only.
Write
The date partition the data is going to be written to. Should be given in the format . Can be used to overwrite the data of a single partition, like this:
(Optional). On write only.
Write
If field is specified together with `partitionType`, the table is partitioned by this field. The field must be a top-level TIMESTAMP or DATE field. Its mode must be NULLABLE or REQUIRED. If the option is not set for a partitioned table, then the table will be partitioned by pseudo column, referenced via either type, or type.
(Optional).
Write
Number of milliseconds for which to keep the storage for partitions in the table. The storage in a partition will have an expiration time of its partition time plus this value.
(Optional).
Write
The only type supported is DAY, which will generate one partition per day. This option is mandatory for a target table to be partitioned.
(Optional. Defaults to DAY if PartitionField is specified).
Write
Comma separated list of non-repeated, top level columns. Clustering is only supported for partitioned tables
(Optional).
Write
Adds the ALLOW_FIELD_ADDITION SchemaUpdateOption to the BigQuery LoadJob. Allowed vales are and .
(Optional. Default to ).
Write
Adds the ALLOW_FIELD_RELAXATION SchemaUpdateOption to the BigQuery LoadJob. Allowed vales are and .
(Optional. Default to ).
Write
Address of the proxy server. The proxy must be a HTTP proxy and address should be in the `host:port` format. Can be alternatively set in the Spark configuration () or in Hadoop Configuration ().
(Optional. Required only if connecting to GCP via proxy.)
Read/Write
The userName used to connect to the proxy. Can be alternatively set in the Spark configuration () or in Hadoop Configuration ().
(Optional. Required only if connecting to GCP via proxy with authentication.)
Read/Write
The password used to connect to the proxy. Can be alternatively set in the Spark configuration () or in Hadoop Configuration ().
(Optional. Required only if connecting to GCP via proxy with authentication.)
Read/Write
The maximum number of retries for the low-level HTTP requests to BigQuery. Can be alternatively set in the Spark configuration () or in Hadoop Configuration ().
(Optional. Default is 10)
Read/Write
The timeout in milliseconds to establish a connection with BigQuery. Can be alternatively set in the Spark configuration () or in Hadoop Configuration ().
(Optional. Default is 60000 ms. 0 for an infinite timeout, a negative number for 20000)
Read/Write
The timeout in milliseconds to read data from an established connection. Can be alternatively set in the Spark configuration () or in Hadoop Configuration ().
(Optional. Default is 60000 ms. 0 for an infinite timeout, a negative number for 20000)
Read

Options can also be set outside of the code, using the parameter of or parameter of the . In order to use this, prepend the prefix to any of the options, for example can also be set as .

Data types

With the exception of and all BigQuery data types directed map into the corresponding Spark SQL data type. Here are all of the mappings:

BigQuery Standard SQL Data Type Spark SQL

Data Type

Notes
This preserves 's full 38 digits of precision and 9 digits of scope.
Spark has no DATETIME type. Casting to TIMESTAMP uses a configured TimeZone, which defaults to the local timezone (UTC in GCE / Dataproc).

We are considering adding an optional TimeZone property to allow automatically converting to TimeStamp, this would be consistent with Spark's handling of CSV/JSON (except they always try to convert when inferring schema, and default to the local timezone)

Spark has no TIME type. The generated longs, which indicate microseconds since midnight can be safely cast to TimestampType, but this causes the date to be inferred as the current day. Thus times are left as longs and user can cast if they like.

When casting to Timestamp TIME have the same TimeZone issues as DATETIME

Sours: https://github.com/GoogleCloudDataproc/spark-bigquery-connector

Bigquery spark

.

Three Use Cases for BigQuery and Apache Spark

.

Now discussing:

.



163 164 165 166 167