Copying data from BigQuery to Postgres (Cloud SQL)

Google provides a database migration service but at time of this writing it does not support copying data from BigQuery to Postgres (Cloud SQL). You can DIY in a couple of ways. Here I describe a way to do it using Dataflow. At high-level you will create a Pipeline and run it like this (I am using Java):

Pipeline p = Pipeline.create(options);
        p
            .apply(BigQueryIO.readTableRows().fromQuery(readSql).usingStandardSql())
            .apply(JdbcIO.<TableRow>write()
                .withDataSourceConfiguration(pgConfiguration)
                .withStatement(insertSql)
                .withPreparedStatementSetter(insertHelper)
            );        
        p.run().waitUntilFinish();        

where readSql is given by:

String readSql = String.format("select %s from %s.%s", config.queryColumns, config.gcpDataset, config.gcpTable);

and pgConfiguration is created as:

DataSourceConfiguration pgConfiguration = JdbcIO.DataSourceConfiguration.create(POSTGRES_DRIVER, pgConnectionString);

The pgConnectionString will be of the form:

jdbc:postgresql:///${pgDatabase}?cloudSqlInstance=${gcpProjectId}:${pgRegion}:${pgInstance}&socketFactory=com.google.cloud.sql.postgres.SocketFactory&user=${pgUsername}&password=${pgPassword}&useSSL=true

insertSql will give the SQL command to insert a row into the table and will be of the form:

insert into XYZ (colA, colB, ColC) values (?,?,?)

The insertHelper has to implement JdbcIO.PreparedStatementSetter<TableRow> and override:

public void setParameters(TableRow element, PreparedStatement query) throws Exception

The first argument element will contain a row from BigQuery and the second argument query will contain the prepared statement to insert the row into Postgres. The job of insertHelper is to set the query parameters based on element. This is where you will do a lot of the work. You have to parse each column of element and depending on the data type of the column convert to appropriate object. E.g.:

Object o = element.get(columnName);
        switch (typeName.toUpperCase()) {
            case "NUMERIC":
                // https://www.instaclustr.com/blog/postgresql-data-types-mappings-to-sql-jdbc-and-java-data-types/
                query.setBigDecimal(parameterIndex, Utils.parseBigDecimal(o, this.scale, this.mode));
                break;
            case "STRING":
                query.setString(parameterIndex, Utils.parseString(o));
                break;
            case "TIMESTAMP":
                query.setTimestamp(parameterIndex, Utils.parseTimestamp(o));
                break;
            case "INT64":
                query.setLong(parameterIndex, Utils.parseLong(o));
                break;
            default:
                throw new NotImplementedException();
        }

You need to add a bunch of Maven dependencies (left as exercise). Most are found under groupId:org.apache.beam. I used version 2.44.0 at time of this writing.

Finally you can run the program like this:

mvn compile exec:java -e \
-Dexec.mainClass=your.main.class \
-Dexec.args="$PROPERTIES_FILE \
--project=gcp-project \
--region=us-central1 \
--stagingLocation=gs://your/staging/location \
--tempLocation=gs://your/temp/location \
--runner=DataflowRunner \
--diskSizeGb=100 \
--numberOfWorkerHarnessThreads=8 \
--workerMachineType=n1-highmem-8 \
--maxNumWorkers=4
"

The stagingLocation is where the JAR is uploaded and the tempLocation is needed to store intermediate avro files (these get deleted after job completion). I changed the default workerMachineType as otherwise I got an OOM exception. If your Postgres is hosted in a private IP due to security reasons, you will also need to use the --network and --subnetwork flags.

With this you should be able to copy tables having TB of data successfully from BigQuery to Postgres.

This entry was posted in Computers, programming, Software and tagged , . Bookmark the permalink.

Leave a comment