Add GCS Bucket Mirrors (#26382)

This commit contains changes to support Google Cloud Storage 
buckets as mirrors, meant for hosting Spack build-caches. This 
feature is beneficial for folks that are running infrastructure on 
Google Cloud Platform. On public cloud systems, resources are
ephemeral and in many cases, installing compilers, MPI flavors,
and user packages from scratch takes up considerable time.

Giving users the ability to host a Spack mirror that can store build
caches in GCS buckets offers a clean solution for reducing
application rebuilds for Google Cloud infrastructure.

Co-authored-by: Joe Schoonover <joe@fluidnumerics.com>
This commit is contained in:
Doug Jacobsen 2021-10-21 22:22:38 -06:00 committed by GitHub
parent d024faf044
commit d1d0021647
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
5 changed files with 375 additions and 0 deletions

View file

@ -1407,6 +1407,55 @@ def fetch(self):
raise FailedDownloadError(self.url)
@fetcher
class GCSFetchStrategy(URLFetchStrategy):
"""FetchStrategy that pulls from a GCS bucket."""
url_attr = 'gs'
def __init__(self, *args, **kwargs):
try:
super(GCSFetchStrategy, self).__init__(*args, **kwargs)
except ValueError:
if not kwargs.get('url'):
raise ValueError(
"GCSFetchStrategy requires a url for fetching.")
@_needs_stage
def fetch(self):
import spack.util.web as web_util
if self.archive_file:
tty.debug('Already downloaded {0}'.format(self.archive_file))
return
parsed_url = url_util.parse(self.url)
if parsed_url.scheme != 'gs':
raise FetchError(
'GCSFetchStrategy can only fetch from gs:// urls.')
tty.debug('Fetching {0}'.format(self.url))
basename = os.path.basename(parsed_url.path)
with working_dir(self.stage.path):
_, headers, stream = web_util.read_from_url(self.url)
with open(basename, 'wb') as f:
shutil.copyfileobj(stream, f)
content_type = web_util.get_header(headers, 'Content-type')
if content_type == 'text/html':
warn_content_type_mismatch(self.archive_file or "the archive")
if self.stage.save_filename:
os.rename(
os.path.join(self.stage.path, basename),
self.stage.save_filename)
if not self.archive_file:
raise FailedDownloadError(self.url)
def stable_target(fetcher):
"""Returns whether the fetcher target is expected to have a stable
checksum. This is only true if the target is a preexisting archive

View file

@ -0,0 +1,26 @@
# Copyright 2013-2021 Lawrence Livermore National Security, LLC and other
# Spack Project Developers. See the top-level COPYRIGHT file for details.
#
# SPDX-License-Identifier: (Apache-2.0 OR MIT)
import six.moves.urllib.response as urllib_response
import spack.util.url as url_util
import spack.util.web as web_util
def gcs_open(req, *args, **kwargs):
"""Open a reader stream to a blob object on GCS
"""
import spack.util.gcs as gcs_util
url = url_util.parse(req.get_full_url())
gcsblob = gcs_util.GCSBlob(url)
if not gcsblob.exists():
raise web_util.SpackWebError('GCS blob {0} does not exist'.format(
gcsblob.blob_path))
stream = gcsblob.get_blob_byte_stream()
headers = gcsblob.get_blob_headers()
return urllib_response.addinfourl(stream, headers, url)

View file

@ -0,0 +1,54 @@
# Copyright 2013-2021 Lawrence Livermore National Security, LLC and other
# Spack Project Developers. See the top-level COPYRIGHT file for details.
#
# SPDX-License-Identifier: (Apache-2.0 OR MIT)
import os
import pytest
import spack.config
import spack.fetch_strategy
import spack.stage
@pytest.mark.parametrize('_fetch_method', ['curl', 'urllib'])
def test_gcsfetchstrategy_without_url(_fetch_method):
"""Ensure constructor with no URL fails."""
with spack.config.override('config:url_fetch_method', _fetch_method):
with pytest.raises(ValueError):
spack.fetch_strategy.GCSFetchStrategy(None)
@pytest.mark.parametrize('_fetch_method', ['curl', 'urllib'])
def test_gcsfetchstrategy_bad_url(tmpdir, _fetch_method):
"""Ensure fetch with bad URL fails as expected."""
testpath = str(tmpdir)
with spack.config.override('config:url_fetch_method', _fetch_method):
fetcher = spack.fetch_strategy.GCSFetchStrategy(url='file:///does-not-exist')
assert fetcher is not None
with spack.stage.Stage(fetcher, path=testpath) as stage:
assert stage is not None
assert fetcher.archive_file is None
with pytest.raises(spack.fetch_strategy.FetchError):
fetcher.fetch()
@pytest.mark.parametrize('_fetch_method', ['curl', 'urllib'])
def test_gcsfetchstrategy_downloaded(tmpdir, _fetch_method):
"""Ensure fetch with archive file already downloaded is a noop."""
testpath = str(tmpdir)
archive = os.path.join(testpath, 'gcs.tar.gz')
with spack.config.override('config:url_fetch_method', _fetch_method):
class Archived_GCSFS(spack.fetch_strategy.GCSFetchStrategy):
@property
def archive_file(self):
return archive
url = 'gcs:///{0}'.format(archive)
fetcher = Archived_GCSFS(url=url)
with spack.stage.Stage(fetcher, path=testpath):
fetcher.fetch()

215
lib/spack/spack/util/gcs.py Normal file
View file

@ -0,0 +1,215 @@
# Copyright 2013-2021 Lawrence Livermore National Security, LLC and other
# Spack Project Developers. See the top-level COPYRIGHT file for details.
#
# SPDX-License-Identifier: (Apache-2.0 OR MIT)
"""
This file contains the definition of the GCS Blob storage Class used to
integrate GCS Blob storage with spack buildcache.
"""
import os
import sys
import llnl.util.tty as tty
def gcs_client():
"""Create a GCS client
Creates an authenticated GCS client to access GCS buckets and blobs
"""
try:
import google.auth
from google.cloud import storage
except ImportError as ex:
tty.error('{0}, google-cloud-storage python module is missing.'.format(ex) +
' Please install to use the gs:// backend.')
sys.exit(1)
storage_credentials, storage_project = google.auth.default()
storage_client = storage.Client(storage_project,
storage_credentials)
return storage_client
class GCSBucket(object):
"""GCS Bucket Object
Create a wrapper object for a GCS Bucket. Provides methods to wrap spack
related tasks, such as destroy.
"""
def __init__(self, url, client=None):
"""Constructor for GCSBucket objects
Args:
url (str): The url pointing to the GCS bucket to build an object out of
client (google.cloud.storage.client.Client): A pre-defined storage
client that will be used to access the GCS bucket.
"""
if url.scheme != 'gs':
raise ValueError('Can not create GCS bucket connection with scheme {SCHEME}'
.format(SCHEME=url.scheme))
self.url = url
self.name = self.url.netloc
if self.url.path[0] == '/':
self.prefix = self.url.path[1:]
else:
self.prefix = self.url.path
self.client = client or gcs_client()
self.bucket = None
tty.debug('New GCS bucket:')
tty.debug(" name: {0}".format(self.name))
tty.debug(" prefix: {0}".format(self.prefix))
def exists(self):
from google.cloud.exceptions import NotFound
if not self.bucket:
try:
self.bucket = self.client.bucket(self.name)
except NotFound as ex:
tty.error("{0}, Failed check for bucket existence".format(ex))
sys.exit(1)
return self.bucket is not None
def create(self):
if not self.bucket:
self.bucket = self.client.create_bucket(self.name)
def get_blob(self, blob_path):
if self.exists():
return self.bucket.get_blob(blob_path)
return None
def blob(self, blob_path):
if self.exists():
return self.bucket.blob(blob_path)
return None
def get_all_blobs(self, recursive=True, relative=True):
"""Get a list of all blobs
Returns a list of all blobs within this bucket.
Args:
relative: If true (default), print blob paths
relative to 'build_cache' directory.
If false, print absolute blob paths (useful for
destruction of bucket)
"""
tty.debug('Getting GCS blobs... Recurse {0} -- Rel: {1}'.format(
recursive, relative))
converter = str
if relative:
converter = self._relative_blob_name
if self.exists():
all_blobs = self.bucket.list_blobs(prefix=self.prefix)
blob_list = []
base_dirs = len(self.prefix.split('/')) + 1
for blob in all_blobs:
if not recursive:
num_dirs = len(blob.name.split('/'))
if num_dirs <= base_dirs:
blob_list.append(converter(blob.name))
else:
blob_list.append(converter(blob.name))
return blob_list
def _relative_blob_name(self, blob_name):
return os.path.relpath(blob_name, self.prefix)
def destroy(self, recursive=False, **kwargs):
"""Bucket destruction method
Deletes all blobs within the bucket, and then deletes the bucket itself.
Uses GCS Batch operations to bundle several delete operations together.
"""
from google.cloud.exceptions import NotFound
tty.debug("Bucket.destroy(recursive={0})".format(recursive))
try:
bucket_blobs = self.get_all_blobs(recursive=recursive, relative=False)
batch_size = 1000
num_blobs = len(bucket_blobs)
for i in range(0, num_blobs, batch_size):
with self.client.batch():
for j in range(i, min(i + batch_size, num_blobs)):
blob = self.blob(bucket_blobs[j])
blob.delete()
except NotFound as ex:
tty.error("{0}, Could not delete a blob in bucket {1}.".format(
ex, self.name))
sys.exit(1)
class GCSBlob(object):
"""GCS Blob object
Wraps some blob methods for spack functionality
"""
def __init__(self, url, client=None):
self.url = url
if url.scheme != 'gs':
raise ValueError('Can not create GCS blob connection with scheme: {SCHEME}'
.format(SCHEME=url.scheme))
self.client = client or gcs_client()
self.bucket = GCSBucket(url)
self.blob_path = self.url.path.lstrip('/')
tty.debug("New GCSBlob")
tty.debug(" blob_path = {0}".format(self.blob_path))
if not self.bucket.exists():
tty.warn("The bucket {0} does not exist, it will be created"
.format(self.bucket.name))
self.bucket.create()
def get(self):
return self.bucket.get_blob(self.blob_path)
def exists(self):
from google.cloud.exceptions import NotFound
try:
blob = self.bucket.blob(self.blob_path)
exists = blob.exists()
except NotFound:
return False
return exists
def delete_blob(self):
from google.cloud.exceptions import NotFound
try:
blob = self.bucket.blob(self.blob_path)
blob.delete()
except NotFound as ex:
tty.error("{0}, Could not delete gcs blob {1}".format(ex, self.blob_path))
def upload_to_blob(self, local_file_path):
blob = self.bucket.blob(self.blob_path)
blob.upload_from_filename(local_file_path)
def get_blob_byte_stream(self):
return self.bucket.get_blob(self.blob_path).open(mode='rb')
def get_blob_headers(self):
blob = self.bucket.get_blob(self.blob_path)
headers = {
'Content-type': blob.content_type,
'Content-encoding': blob.content_encoding,
'Content-language': blob.content_language,
'MD5Hash': blob.md5_hash
}
return headers

View file

@ -28,6 +28,7 @@
import spack.error
import spack.url
import spack.util.crypto
import spack.util.gcs as gcs_util
import spack.util.s3 as s3_util
import spack.util.url as url_util
from spack.util.compression import ALLOWED_ARCHIVE_TYPES
@ -74,6 +75,10 @@ def uses_ssl(parsed_url):
if url_util.parse(endpoint_url, scheme='https').scheme == 'https':
return True
elif parsed_url.scheme == 'gs':
tty.debug("(uses_ssl) GCS Blob is https")
return True
return False
@ -195,6 +200,12 @@ def push_to_url(
if not keep_original:
os.remove(local_file_path)
elif remote_url.scheme == 'gs':
gcs = gcs_util.GCSBlob(remote_url)
gcs.upload_to_blob(local_file_path)
if not keep_original:
os.remove(local_file_path)
else:
raise NotImplementedError(
'Unrecognized URL scheme: {SCHEME}'.format(
@ -217,6 +228,10 @@ def url_exists(url):
return False
raise err
elif url.scheme == 'gs':
gcs = gcs_util.GCSBlob(url)
return gcs.exists()
# otherwise, just try to "read" from the URL, and assume that *any*
# non-throwing response contains the resource represented by the URL
try:
@ -279,6 +294,15 @@ def remove_url(url, recursive=False):
s3.delete_object(Bucket=bucket, Key=url.path.lstrip('/'))
return
elif url.scheme == 'gs':
if recursive:
bucket = gcs_util.GCSBucket(url)
bucket.destroy(recursive=recursive)
else:
blob = gcs_util.GCSBlob(url)
blob.delete_blob()
return
# Don't even try for other URL schemes.
@ -358,6 +382,10 @@ def list_url(url, recursive=False):
key.split('/', 1)[0]
for key in _iter_s3_prefix(s3, url)))
elif url.scheme == 'gs':
gcs = gcs_util.GCSBucket(url)
return gcs.get_all_blobs(recursive=recursive)
def spider(root_urls, depth=0, concurrency=32):
"""Get web pages from root URLs.
@ -516,6 +544,9 @@ def _urlopen(req, *args, **kwargs):
if url_util.parse(url).scheme == 's3':
import spack.s3_handler
opener = spack.s3_handler.open
elif url_util.parse(url).scheme == 'gs':
import spack.gcs_handler
opener = spack.gcs_handler.gcs_open
try:
return opener(req, *args, **kwargs)