Google provides a database migration service but at time of this writing it does not support copying data from BigQuery to Postgres (Cloud SQL). You can DIY in a couple of ways. Here I describe a way to do it using Dataflow. At high-level you will create a Pipeline
and run it like this (I am using Java):
Pipeline p = Pipeline.create(options);
p
.apply(BigQueryIO.readTableRows().fromQuery(readSql).usingStandardSql())
.apply(JdbcIO.<TableRow>write()
.withDataSourceConfiguration(pgConfiguration)
.withStatement(insertSql)
.withPreparedStatementSetter(insertHelper)
);
p.run().waitUntilFinish();
where readSql
is given by:
String readSql = String.format("select %s from %s.%s", config.queryColumns, config.gcpDataset, config.gcpTable);
and pgConfiguration
is created as:
DataSourceConfiguration pgConfiguration = JdbcIO.DataSourceConfiguration.create(POSTGRES_DRIVER, pgConnectionString);
The pgConnectionString
will be of the form:
jdbc:postgresql:///${pgDatabase}?cloudSqlInstance=${gcpProjectId}:${pgRegion}:${pgInstance}&socketFactory=com.google.cloud.sql.postgres.SocketFactory&user=${pgUsername}&password=${pgPassword}&useSSL=true
insertSql
will give the SQL command to insert a row into the table and will be of the form:
insert into XYZ (colA, colB, ColC) values (?,?,?)
The insertHelper
has to implement JdbcIO.PreparedStatementSetter<TableRow>
and override:
public void setParameters(TableRow element, PreparedStatement query) throws Exception
The first argument element
will contain a row from BigQuery and the second argument query
will contain the prepared statement to insert the row into Postgres. The job of insertHelper
is to set the query parameters based on element
. This is where you will do a lot of the work. You have to parse each column of element
and depending on the data type of the column convert to appropriate object. E.g.:
Object o = element.get(columnName);
switch (typeName.toUpperCase()) {
case "NUMERIC":
// https://www.instaclustr.com/blog/postgresql-data-types-mappings-to-sql-jdbc-and-java-data-types/
query.setBigDecimal(parameterIndex, Utils.parseBigDecimal(o, this.scale, this.mode));
break;
case "STRING":
query.setString(parameterIndex, Utils.parseString(o));
break;
case "TIMESTAMP":
query.setTimestamp(parameterIndex, Utils.parseTimestamp(o));
break;
case "INT64":
query.setLong(parameterIndex, Utils.parseLong(o));
break;
default:
throw new NotImplementedException();
}
You need to add a bunch of Maven dependencies (left as exercise). Most are found under groupId:org.apache.beam
. I used version 2.44.0
at time of this writing.
Finally you can run the program like this:
mvn compile exec:java -e \
-Dexec.mainClass=your.main.class \
-Dexec.args="$PROPERTIES_FILE \
--project=gcp-project \
--region=us-central1 \
--stagingLocation=gs://your/staging/location \
--tempLocation=gs://your/temp/location \
--runner=DataflowRunner \
--diskSizeGb=100 \
--numberOfWorkerHarnessThreads=8 \
--workerMachineType=n1-highmem-8 \
--maxNumWorkers=4
"
The stagingLocation
is where the JAR is uploaded and the tempLocation
is needed to store intermediate avro files (these get deleted after job completion). I changed the default workerMachineType
as otherwise I got an OOM exception. If your Postgres is hosted in a private IP due to security reasons, you will also need to use the --network
and --subnetwork
flags.
With this you should be able to copy tables having TB of data successfully from BigQuery to Postgres.