Mid-Stream Changing Configuration with Check-Pointed Spark Stream -


i have spark streaming / dstream app this:

// function create , setup new streamingcontext def functiontocreatecontext(): streamingcontext = {   val ssc = new streamingcontext(...)   // new context   val lines = ssc.sockettextstream(...) // create dstreams   ...   ssc.checkpoint(checkpointdirectory)   // set checkpoint directory   ssc }  // streamingcontext checkpoint data or create new 1 val context = streamingcontext.getorcreate(checkpointdirectory, functiontocreatecontext _)  // additional setup on context needs done, // irrespective of whether being started or restarted context. ...  // start context context.start() context.awaittermination() 

where context uses configuration file can pull items methods appconf.getstring. use:

val context = streamingcontext.getorcreate(     appconf.getstring("spark.checkpointdirectory"),      () => createstreamcontext(sparkconf, appconf)) 

where val sparkconf = new sparkconf()....

if stop app , change configuration in app file, these changes not picked unless delete checkpoint directory contents. example, change spark.streaming.kafka.maxrateperpartition or spark.windowdurationsecs dynamically. (edit: kill app, change configuration file , restart app.) how can dynamically change these settings or enforce (edited word) configuration change without trashing checkpoint directory (which include checkpoints state info)?

how can dynamically change these settings or enforce configuration change without trashing checkpoint directory?

if dive code streamingcontext.getorcreate:

def getorcreate(     checkpointpath: string,     creatingfunc: () => streamingcontext,     hadoopconf: configuration = sparkhadooputil.get.conf,     createonerror: boolean = false   ): streamingcontext = {     val checkpointoption = checkpointreader.read(       checkpointpath, new sparkconf(), hadoopconf, createonerror)     checkpointoption.map(new streamingcontext(null, _, null)).getorelse(creatingfunc()) } 

you can see if checkpointreader has checkpointed data in class path, uses new sparkconf() parameter, overload doesn't allow passing of custom created sparkconf. default, sparkconf load settings declared either environment variable or passed classpath:

class sparkconf(loaddefaults: boolean) extends cloneable logging {    import sparkconf._    /** create sparkconf loads defaults system properties , classpath */   def this() = this(true) 

so 1 way of achieving want instead of creating sparkconf object in code, can pass parameters via spark.driver.extraclasspath , spark.executor.extraclasspath spark-submit.


Comments

Popular posts from this blog

Load Balancing in Bluemix using custom domain and DNS SRV records -

oracle - pls-00402 alias required in select list of cursor to avoid duplicate column names -

python - Consider setting $PYTHONHOME to <prefix>[:<exec_prefix>] error -