# Copyright (c) 2021. Harvard University
#
# Developed by Research Software Engineering,
# Faculty of Arts and Sciences, Research Computing (FAS RC)
# Authors: Michael Bouzinier, Eugene Pokidov
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#
import argparse
import logging
import os
import subprocess
import tempfile
from typing import Any, Dict, Optional
import yaml
from nsaph_utils.docutils.md_creator import MDCreator
cwl_src_template = """---
orphan: true
---
# {name}
```{literalinclude} ../../src/cwl/{path}
---
language: yaml
---
```
"""
logger = logging.getLogger('script')
logger.addHandler(logging.StreamHandler())
arg_parser = argparse.ArgumentParser(description='Convert CWL files into Markdown')
arg_parser.add_argument(
'-i', '--input-dir', type=str, required=True, dest='input_dir', help='An input dir with CWL files'
)
arg_parser.add_argument(
'-o', '--output-dir', type=str, required=True, dest='output_dir', help='An output dir for Markdown files'
)
arg_parser.add_argument('-v', '--verbose', dest='verbose', action='store_true', help='An extended logging')
[docs]class CWLParser:
def __init__(self, input_file_path: str, output_file_path: str, image_file_path: str):
logger.info(output_file_path)
self.input_file_path = input_file_path
with open(input_file_path, 'r') as cwl_file:
self.raw_content = cwl_file.read()
self.yaml_content = yaml.safe_load(self.raw_content)
self.output_file_path = output_file_path
self.image_file_path = image_file_path
self.md_file = MDCreator(file_name=self.output_file_path)
self.sub_workflow_counter = 0
[docs] def parse(self):
self._add_title()
self._add_source()
self._add_image()
self._add_contents()
self._add_header()
self._add_docs()
self._add_inputs()
self._add_outputs()
self._add_steps()
self.md_file.save()
def _add_title(self):
title = self._find_title(self.raw_content) or self._get_filename(self.output_file_path.replace('.md', '.cwl'))
self.md_file.add_header(text=title, level=1)
def _add_source(self):
of = self.output_file_path.replace(".md", "cwl_src.md")
inf = os.path.basename(self.input_file_path)
content = cwl_src_template.format(name=inf, path=inf,
literalinclude="{literalinclude}")
with open(of, "w") as out:
out.write(content)
self.md_file.add_text("\n [Source code]({}) \n".format(
os.path.basename(of)
))
@staticmethod
def _find_title(content: str) -> Optional[str]:
for line in content.splitlines():
if line.startswith('###'):
return line.replace('###', '').strip()
@staticmethod
def _get_filename(file_path: str) -> str:
return os.path.split(file_path)[1]
def _add_contents(self):
contents = [
'```{contents}',
'---',
'local:',
'---',
'```',
]
self.md_file.add_text('\n'.join(contents))
def _add_image(self):
if 'workflow' not in self.yaml_content['class'].lower():
return
subprocess.run(
f'cwl-runner --print-dot {self.input_file_path} | dot -Tpng > {self.image_file_path}', shell=True
)
filename = self.image_file_path.split(os.path.sep)[-1]
self.md_file.add_image(filename)
def _add_header(self):
if "expressiontool" in self.yaml_content['class'].lower():
header = '**JavaScript Tool** '
elif 'tool' in self.yaml_content['class'].lower():
header = '**Tool**. Runs: ' + self._extract_tool(str(self.yaml_content['baseCommand']))
elif 'workflow' in self.yaml_content['class'].lower():
header = '**Workflow**'
else:
header = '**Unknown class**'
self.md_file.add_text(header)
def _extract_tool(self, base_command: str) -> str:
tool = base_command.replace('[', '').replace(']', '').replace("'", '').split(', ')[-1]
pp = tool.split('.')
docroot = os.path.join("..", "..", "..")
docpath = os.path.join("doc", "members")
d = os.path.dirname(self.output_file_path)
if pp[0] == "nsaph":
docdir = "core-platform"
elif pp[0] == "nsaph_utils":
docdir = "utils"
elif os.path.isdir(os.path.join(d ,docroot, pp[0])):
docdir = pp[0]
else:
return tool
doc = os.path.join(docroot, docdir, docpath, pp[-1])
return f'[{tool}]({doc})'
def _add_docs(self):
if 'doc' not in self.yaml_content:
return
self.md_file.add_header(text='Description', level=2)
self.md_file.add_text(self.yaml_content['doc'])
def _add_inputs(self):
self.md_file.add_header(text='Inputs', level=2)
data = [('Name', 'Type', 'Default', 'Description')]
for name, arg in self.yaml_content['inputs'].items():
doc = tp = df = ''
if isinstance(arg, str):
doc = name
tp = arg.replace('?', '')
df = None
elif isinstance(arg, dict):
doc = arg.get('doc', ' ').replace('\n', ' ')
tp = arg.get('type', 'string')
df = arg.get('default', None)
df = f'`{df}`' if df else ' '
data.append((name, tp, df, doc))
self.md_file.add_table(data=data)
def _add_outputs(self):
self.md_file.add_header(text='Outputs', level=2)
data = [('Name', 'Type', 'Description')]
for name, arg in self.yaml_content['outputs'].items():
doc = arg.get('doc', ' ').replace('\n', ' ')
tp = arg.get('type', 'string')
if isinstance(tp, str):
tp = tp.replace('?', '')
elif isinstance(tp, dict):
tp = tp['type']
data.append((name, tp, doc))
self.md_file.add_table(data=data)
def _add_steps(self):
if 'steps' not in self.yaml_content:
return
self.md_file.add_header(text='Steps', level=2)
data = [('Name', 'Runs', 'Description')]
for item in self.yaml_content['steps']:
if isinstance(item, dict):
name = item['id']
arg = item
else:
name = item
arg = self.yaml_content['steps'][name]
doc = arg.get('doc', ' ').replace('\n', ' ')
runs = arg['run']
if isinstance(runs, str):
ref_uri = runs.replace('.cwl', '.md')
target = f'[{runs}]({ref_uri})'
elif runs.get('class').lower() == 'workflow':
file_name = self._handle_sub_workflow(name, runs)
target = f"[sub-workflow]({file_name})"
elif runs.get('class').lower() == 'expressiontool':
target = "Evaluates JavaScript expression"
else:
target = runs.get('baseCommand', 'command')
data.append((name, target, doc))
self.md_file.add_table(data=data)
def _handle_sub_workflow(self, workflow_name: str, data: Dict[str, Any]) -> str:
prepared_data = {
**data,
'cwlVersion': self.yaml_content['cwlVersion'],
'requirements': self.yaml_content['requirements'],
}
pipeline_file_name = self._get_filename(self.input_file_path)
header = f'### Sub-workflow *{workflow_name}* from {pipeline_file_name} \n\n'
source_dir, basename = os.path.split(self.input_file_path)
with tempfile.NamedTemporaryFile(mode='w+', dir=source_dir, suffix=basename) as temp_file:
temp_file.write(header)
yaml.safe_dump(prepared_data, temp_file)
self.sub_workflow_counter += 1
output_file_path = self.output_file_path.replace('.md', f'_{self.sub_workflow_counter}.md')
image_file_path = self.output_file_path.replace('.md', f'_{self.sub_workflow_counter}.png')
CWLParser(temp_file.name, output_file_path, image_file_path).parse()
return self._get_filename(output_file_path)
[docs]def main():
args = arg_parser.parse_args()
if args.verbose:
logger.setLevel(logging.INFO)
if os.path.isdir(args.input_dir):
cwl_files = [
os.path.join(os.path.abspath(args.input_dir), file_name)
for file_name in os.listdir(args.input_dir)
]
elif os.path.isfile(args.input_dir):
cwl_files = [args.input_dir]
else:
raise ValueError("No such file or directory: " + args.input_dir)
for input_file_path in cwl_files:
if os.path.splitext(input_file_path)[1].lower() != '.cwl':
continue
file_name = os.path.basename(input_file_path)
output_file_path = os.path.join(os.path.abspath(args.output_dir), file_name.replace('.cwl', '.md'))
image_file_path = os.path.join(os.path.abspath(args.output_dir), file_name.replace('.cwl', '.png'))
CWLParser(input_file_path, output_file_path, image_file_path).parse()
if __name__ == '__main__':
main()