from __future__ import print_function import mastcasjobs import casjobs import logging import os import datetime from astropy.io import ascii import time import sys import threading def writeTheCSVFile(att,newFN): logger.info("WRITESTARTED: "+newFN) ascii.write(att,newFN,format='fast_csv',overwrite=True) logger.info("WRITEENDED: ") main_thread = threading.current_thread() tableToDownload="StackObjectThin" columnsToDownload="*" mainID="objID" otherID="uniquePspsSTid" orderByStr=mainID+","+otherID outputDir="/ini/mar/lsst-mike/PS1DR2/"+tableToDownload newDir="/disk71/hips/lsst-mike/PS1DR2/"+tableToDownload logger = logging.getLogger('myLog') formatter = logging.Formatter("%(asctime)s;%(levelname)s;%(message)s", "%Y-%m-%d %H:%M:%S") logger.setLevel(logging.INFO ) fH = logging.FileHandler(os.path.join( 'logs', tableToDownload+"_"+datetime.datetime.now().strftime('%Y-%m-%d--%H-%M-%S')+'_table.log')) fH.setFormatter(formatter) rowsRequested=1000000 rowsReturned=rowsRequested #objIDLimit=-999999999 #otherIDLimit=-999999999 objIDLimit=175053053195565170 otherIDLimit=3750402000018777 logger.addHandler(fH) wsid = 657237824 un="MikeRead" pwd = "****" jobs = mastcasjobs.MastCasJobs(username=un, password=pwd, context="PanSTARRS_DR2") loop=0 while (rowsReturned==rowsRequested): loop=loop+1 myDBTable="mr_"+tableToDownload+"_"+str(objIDLimit).replace("-999999999","0") whereStr=" where "+mainID+" > "+str(objIDLimit)+ " or ( "+mainID+"="+str(objIDLimit)+" and "+otherID+" > "+str(otherIDLimit)+" ) " query="select top "+str(rowsRequested)+" "+columnsToDownload+" into "+ myDBTable +" from "+tableToDownload+whereStr+" order by " +orderByStr logger.info("LOOP="+str(loop)) logger.info("QUERY="+query) logger.info("MYDBTABLE="+myDBTable) logger.info("OBJID="+str(objIDLimit)) logger.info("OTHERID="+str(otherIDLimit)) #drop logger.info("DROPPINGINCASE="+myDBTable) try: jobs.drop_table(table=myDBTable) except Exception as e: logger.info("FAILED="+str(e)) logger.info("SUBMITTINGQUERY="+query) checkSuccess=0 noCheckFails=0 jobID=-9999 while (checkSuccess==0): try: jobID=jobs.submit(q=query) checkSuccess=1 logger.info("QPASSED=") except Exception as e: noCheckFails=noCheckFails+1 logger.info("FAILED="+str(noCheckFails)) logger.error("ERROR="+str(e)) time.sleep(min(60,noCheckFails)) logger.info("QUERYSUBMITTED="+query) logger.info("JOBID="+str(jobID)) logger.info("STARTMONITOR=") checkSuccess=0 noCheckFails=0 myStatus=-9999 while (checkSuccess==0): try: myStatus = jobs.monitor(jobID) checkSuccess=1 logger.info("MPASSED=") except Exception as e: noCheckFails=noCheckFails+1 logger.info("FAILED="+str(noCheckFails)) logger.error("ERROR="+str(e)) time.sleep(min(60,noCheckFails)) logger.info("STARTMONITOR="+str(myStatus)) oFN=outputDir+"/"+myDBTable+'.csv' nFN=newDir+"/"+myDBTable+'.csv' logger.info("FETCHINGTABLE="+myDBTable+" to "+oFN) getCount=0 getFail=0 while(getFail==0): try: logger.info("TRYINGGET:") time.sleep(0.5) jobs.request_and_get_output(table=myDBTable, outtype='CSV', outfn=oFN) logger.info("GOTFILE:") logger.info("READINGFILE:") data = ascii.read(oFN, format='csv', fast_reader=True) beforeSortObjID=data[mainID][rowsReturned-1] logger.info("LASTOBJIDBEFORESORT="+str(beforeSortObjID)) getFail=1 except Exception as e: logger.info("GETFAILED="+str(e)) getCount=getCount+1 time.sleep(min(60,getCount)) logger.info("GETFAILED="+str(getCount)) logger.info("TABLEFETCHED="+myDBTable+" to "+oFN) #drop logger.info("DROPPINGTABLE="+myDBTable) try: jobs.drop_table(table=myDBTable) except Exception as e: logger.info("FAILED="+str(e)) logger.info("TABLEDROPPED="+myDBTable) data.sort([mainID,otherID]) lastObjID=data[mainID][rowsReturned-1] lastOtherID=data[otherID][rowsReturned-1] maxObjID=max(data[mainID]) logger.info("MAXOBJID="+str(maxObjID)) logger.info("LASTOBJID="+str(lastObjID)) logger.info("LASTOTHERID="+str(lastOtherID)) if (maxObjID==lastObjID): logger.info("OBJIDSMATCH=TRUE") else: logger.info("OBJIDSMATCH=FALSE") #kick off thread for t in threading.enumerate(): logger.info(t.getName()) if t is main_thread: continue logger.debug('joining %s', t.getName()) t.join() rt = threading.Thread(target=writeTheCSVFile, args=(data,nFN)) rt.setName("fileWriter") logger.info("SPAWNING: ") rt.start() logger.info("WRITERSPAWNED: ") #print(lastObjID,lastOtherID,maxObjID) objIDLimit=maxObjID otherIDLimit=lastOtherID # enough is enough if (loop==1000000): rowsReturned=0