#!/cs/gmd/sw/anaconda3/bin/python from datetime import datetime import json import logging import requests from requests.auth import HTTPBasicAuth import base64 from suds.client import Client import ssl import os import argparse ''' This is Wrapper over DACS webclient. For documentation from refintiv, refer - http://dacs-emea-prod.app.hedani.net:8080/dacs-docs/wwhelp/wwhimpl/js/html/wwhelp.htm#href=DACS%20Webservices%20Guide/DACS_WebService_Prog_Guide.1.001.html ''' class DacsWSClient: ''' DACS Web Client ''' # Sites available allsites = [ 'RMDS_EMEA', 'RMDS_HongKong', 'RMDS_NY', 'RMDS_SAOPAULO', 'RMDS_Singapore', 'RMDS_Tokyo', 'Zurich', 'RMDS_DEV_EMEA', 'RMDA_Lab_LN', 'RMDS_DEV_NY', 'RMDS_Tokyo_Dev', 'Zurich_PTA' ] DacsURL = { 'DEV' : 'https://global-onedacs-dev.app.hedani.net:8443/DacsWS/DacsWebServiceService?wsdl', 'PROD' : 'http://dacs-emea-prod.app.hedani.net:8080/DacsWS/DacsWebServiceService?wsdl' } DacsCred = [ 'support', 'support' ] def __init__(self,dacsenv='DEV'): if dacsenv not in [ 'DEV', 'PROD' ]: raise(AttibuteError("Env {} not supported. Use DEV or PROD".format(dacsenv))) self.dacsenv = dacsenv self.setup_logging() self.logger.debug('Created DacsWSClient Object') def setup_logging(self): logging.captureWarnings(True) today = datetime.today().strftime('%Y%m%d') logfile = '/cs/gmd/data/logs/DACS_API_' + self.dacsenv + '-' + today + '.log' logging.basicConfig( filename=logfile, format='%(asctime)s - %(name)s - %(levelname)s - %(message)s', level=logging.DEBUG ) self.logger = logging.getLogger(__name__) def client(self): if hasattr(self, '_client'): return self._client os.environ["DACS_WEBSERVICE_KEYSTORE_LOCATION"] = '/cs/gmd/home/gmdadmin/.keystore' os.environ["DACS_WEBSERVICE_KEYSTORE_PASSWORD"] = "changeit" os.environ["DACS_WEBSERVICE_SECURE_PORT"] = "8443" os.environ["DACS_WEBSERVICE_SECURE"] = "FALSE" try: _create_unverified_https_context = ssl._create_unverified_context except AttributeError: pass else: ssl._create_default_https_context = _create_unverified_https_context _client = Client(self.DacsURL[self.dacsenv], timeout=900) self._client = _client return self._client def login(self): if hasattr(self, '_dl'): return self._dl client = self.client() _dl = client.factory.create('dacsAdministratorLogin') _dl.aAdministratorName.mAdministratorName = self.DacsCred[0] _dl.aAdministratorPassword.mAdministratorPassword = self.DacsCred[1] self._dl = _dl return self._dl def addAdminUser(self, *, username, password, description, sites=['RMDS_DEV_EMEA'] ): self.logger.debug("Request for adding user {} to sites {}".format( username, ','.join(sites))) client = self.client() _du = client.factory.create('dacsAdministratorDefinition') _du.mName = username _du.mPassword = password # _du.mAccessLevel = 1 sites_to_add = [] for site in sites: if site in self.allsites: sites_to_add.append(site) else: self.logger.warning('Site {} not supported, skipping'.format(site)) _du.mSites = sites_to_add _du.mDescription = description result = client.service.createDacsAdministratorDefinition( self.login(), _du ) infostr = 'addAdminUser operation for user {} to sites {}'.format( username, ','.join(sites_to_add)) return self.parseResult(result, infostr) def parseResult(self, result, infostr): self._result = result.aResult.mResultText self._errcode = result.aResult.mResultCode self._errtext = result.aResult.mErrorText if result.aResult.mResultText == 'SUCCESS': self.logger.info( 'SUCCESS: ' + infostr ) return True else: errcode = self._errcode errtext = self._errtext self.logger.error(result.aResult.mResultText + ':' + infostr ) self.logger.error('Error Code: {}, Error String: {}'.format(result.aResult.mResultCode, result.aResult.mErrorText)) return False def generateRandomPassword(self): return 'randonpassword' def validate_args(self,args): True