BDR Automation Examples

You can use the Cloudera Manager API to automate BDR tasks, such as creating a schedule for a replication. For more information about the Cloudera Manager API, see Cloudera Manager API

To use examples the following examples, substitute in the information for your clusters:

Create a Replication Schedule

The following example shows how to create an HDFS replication schedule:

import cm_api
import logging
from cm_api.api_client import ApiResource, ApiException, API_CURRENT_VERSION
from cm_api.endpoints.types import *
from cm_api.endpoints.services import ApiService

target_api = ApiResource(server_host="TARGET-CM-SERVER",version=18)
source_api = ApiResource(server_host="SOURCE-CM-SERVER",version=18)

# cluster = api.get_cluster("Cluster 1")
# cm_api.http_client = logging.basicConfig(level=logging.INFO)

# get hdfs service TARGET/SOURCE
target_cluster = target_api.get_cluster('REPLACE-ME-WITH-TARGET-CLUSTER-NAME')
target_hdfs = target_cluster.get_service('REPLACE-ME-WITH-TARGET-HDFS-NAME')
target_mr = target_cluster.get_service('REPLACE-ME-WITH-TARGET-YARN-NAME')

source_cluster = source_api.get_cluster('REPLACE-ME-WITH-SOURCE-CLUSTER-NAME')
source_hdfs = source_cluster.get_service('REPLACE-ME-WITH-SOURCE-CLUSTER-NAME')

peer = target_api.get_cloudera_manager().get_peer('REPLACE-ME-WITH-PEER-NAME-CONFIGURED-IN-TARGET-CLUSTER')

# setup replication schedule arguments
def hdfs_replication_schedule():
    # List of attributes: https://github.com/cloudera/cm_api/blob/master/python/src/cm_api/endpoints/types.py#L686-L708
    hdfs_args = ApiHdfsReplicationArguments(None)
    peer_name = peer.name
    hdfs_args.sourceService = ApiServiceRef(None, peerName=peer_name,
                                            clusterName=source_cluster.name,
                                            serviceName=source_hdfs.name)
    hdfs_args.numMaps = 20
    hdfs_args.dryRun = False
    hdfs_args.bandwidthPerMap = 100
    hdfs_args.abortOnError = False
    hdfs_args.removeMissingFiles = False
    hdfs_args.preserveReplicationCount = True
    hdfs_args.preserveBlockSize = True
    hdfs_args.preservePermissions = True
    hdfs_args.skipChecksumChecks = False
    hdfs_args.skipTrash = False
    hdfs_args.replicationStrategy = "DYNAMIC"
    hdfs_args.preserveXAttrs = False
    hdfs_args.exclusionFilters = []
    hdfs_args.sourcePath = '/tmp/src'
    hdfs_args.destinationPath = '/tmp/dest'
    hdfs_args.removeMissingFiles = False
    hdfs_args.userName = None
    hdfs_args.mapreduceServiceName = target_mr.name # YARN or MAPREDUCE

    return hdfs_args

schedule = target_hdfs.create_replication_schedule(start_time=None, end_time=None, interval_unit='MINUTE', interval=0, paused=False, arguments=hdfs_replication_schedule(), alert_on_start=False, alert_on_success=False, alert_on_fail=False, alert_on_abort=False)

Run a Replication Schedule

The following example shows how to run a replication schedule that you create with the Cloudera Manager Admin Console or the API:

# TRIGGER a schedule TO RUN
target_hdfs.trigger_replication_schedule(schedule.id)

Update a Replication Schedule Name

The following example shows how to update the name for a replication schedule:

import cm_api
from cm_api.api_client import ApiResource
from cm_api.endpoints.types import *

# Workaround for OPSAPS-43521
replicationSchedule = ApiReplicationSchedule(None)
replicationSchedule._ATTRIBUTES['displayName'] = None

api = ApiResource(server_host="REPLACE-ME-WITH-CM-SERVER",version=18)
service_hdfs = api.get_cluster('REPLACE-ME-WITH-CLUSTER-NAME').get_service('REPLACE-ME-WITH-HDFS-NAME')

# # get the schedule to modify, example schedule id = 32
schedule = service_hdfs.get_replication_schedule(32)

# replication to update
data = ApiList([schedule]).to_json_dict()['items'][0]
data['displayName'] = "UniqueName"
service_hdfs._put("replications/%s" % schedule.id, replicationSchedule, data=data)