[Top][All Lists]
[Date Prev][Date Next][Thread Prev][Thread Next][Date Index][Thread Index]
[GNUnet-SVN] r18219 - in gnunet-update: gnunet_update test
From: |
gnunet |
Subject: |
[GNUnet-SVN] r18219 - in gnunet-update: gnunet_update test |
Date: |
Sat, 19 Nov 2011 19:52:25 +0100 |
Author: harsha
Date: 2011-11-19 19:52:25 +0100 (Sat, 19 Nov 2011)
New Revision: 18219
Modified:
gnunet-update/gnunet_update/install.py
gnunet-update/gnunet_update/package.py
gnunet-update/gnunet_update/util.py
gnunet-update/test/test_util.py
Log:
install time metadata signature validation and install manifest
Modified: gnunet-update/gnunet_update/install.py
===================================================================
--- gnunet-update/gnunet_update/install.py 2011-11-19 14:35:25 UTC (rev
18218)
+++ gnunet-update/gnunet_update/install.py 2011-11-19 18:52:25 UTC (rev
18219)
@@ -28,16 +28,19 @@
import getopt
import platform
import subprocess
+import tempfile
+import shutil
import util
from metadata import Metadata
from dependency import Dependency
+from config import GnunetUpdateConfig
def usage():
"""Print helpful usage information."""
print """
-Usage arguments: [options] <metadata_file> <package_file> </install/location>
+Usage arguments: [options] <package_file> </install/location>
This script tries to install the contents in the package file in the given
location.
@@ -65,9 +68,9 @@
search for it while linking
""")
-def get_installed_deps():
+def get_available_libs():
"""Finds from `ldconfig -v' the installed deps."""
- global installed_deps
+ global available_libs
proc = subprocess.Popen(["ldconfig", "-p"], stdout=subprocess.PIPE)
(ldconfig_output, ldconfig_err) = proc.communicate()
proc.stdout.close()
@@ -91,30 +94,57 @@
usage()
sys.exit(2)
- if len(args) != 3:
+ if len(args) != 2:
print "Incorrect number of arguments"
usage()
sys.exit(1)
- metadata = Metadata()
- metadata.read_from_file(args[0])
- package_tarfile = tarfile.open(args[1],'r')
- install_dir = args[2]
-
+ config = GnunetUpdateConfig() # Configuration
+ package_tarfile = tarfile.open(args[0],'r')
+ install_dir = args[1]
+ # Check for the metadata file and its signature in the given tarfile
try:
metadata_tarinfo = package_tarfile.getmember("metadata.dat")
- except KeyError, err:
- print err
- print "Metadata not found in the given tarfile. Quitting"
+ metadata_sig_tarinfo = package_tarfile.getmember("metadata.dat.asc")
+ except KeyError as no_file:
+ print no_file + " not found in the given tarfile. Quitting"
package_tarfile.close()
sys.exit(2)
-
+
+ # Temporary directory for package extraction
+ temp_dir = tempfile.mkdtemp()
+ package_tarfile.extract(metadata_tarinfo, temp_dir)
+ package_tarfile.extract(metadata_sig_tarinfo, temp_dir)
+
+
+ # Verify metadata signature
+ metadata_fd = open(os.path.join(temp_dir, metadata_tarinfo.name), "rb")
+ metadata_sig_fd = open(os.path.join(temp_dir, metadata_sig_tarinfo.name),
"rb")
+
+ sig = util.gpg_verify_sign(metadata_fd,
+ metadata_sig_fd,
+ config.get('SECURITY', 'PGP_SIGN_KEY'),
+ detached=True)
+ metadata_sig_fd.close()
+ metadata_fd.close()
+
+ if sig[0].status is not None:
+ print "Error verifying the signature of metadata: " + sig[0].status[2]
+ shutil.rmtree(temp_dir)
+ package_tarfile.close()
+ sys.exit(2)
+
+ metadata = Metadata()
+ metadata.read_from_file(os.path.join(temp_dir, metadata_tarinfo.name))
+ shutil.rmtree(temp_dir)
+
#check whether the system and machine architecture match
host_system = platform.system()
host_machine = platform.machine()
if metadata.system != host_system or metadata.machine != host_machine:
print "The given package is not suited for this platform."
+ package_tarfile.close()
sys.exit(1)
#Platform check is done; now unpack the tarfile into destination directory
@@ -123,27 +153,33 @@
except OSError: # Given directory not present
os.mkdir(install_dir, 0755)
- installed_deps = get_installed_deps() # already available dependencies
+ available_libs = get_available_libs() # already available dependencies
new_binary_objects = metadata.binary_objects # To be installed objects
- installed_deps.sort(key=(lambda dep: dep.name))
+ available_libs.sort(key=(lambda dep: dep.name))
+ installed_files = list() # List of files that are installed from package
+
needed_deps = metadata.dependencies
+ # FIXME: Add filtering in config
dep_filter = ["/lib/ld-linux.so.2"]
to_be_installed_deps = [(dep) for dep in needed_deps
- if dep not in installed_deps and
+ if dep not in available_libs and
dep.name not in dep_filter]
#FIXME: Security warning! Perhaps we should examin the contents of tarfile
#before extracting
for member in package_tarfile.getmembers():
- if ("metadata.dat" == member.name or "install-prefix" == member.name
or
- member.name.startswith("dependencies/")):
+ if ("metadata.dat" == member.name or
+ "metadata.dat.asc" == member.name or
+ "install-prefix" == member.name or
+ member.name.startswith("dependencies/")):
continue
elif member.name.startswith("install-prefix"):
# Remove the `install-prefix' directory
member.name = member.name.replace("install-prefix/","",1)
package_tarfile.extract(member, install_dir)
+ installed_files.append(member.name)
# Install the needed dependencies from tarfile
dep_dir = install_dir + "/lib/gnunet-deps"
@@ -159,18 +195,40 @@
# Remove the `dependencies/' in member.name
dep_tarinfo.name = dep_tarinfo.name.replace("dependencies/","",1)
package_tarfile.extract(dep_tarinfo, "./")
+ installed_files.append(os.path.join(dep_dir, dep_tarinfor.name))
+ # Check the hash of the extracted file
+ if sha512_hexdigest(dep_tarinfo.name) != dep.hash:
+ print (dep_tarinfo.name +
+ " not matched with the expected hash " + dep.hash)
+ print "Given package contains code not signed by trusted packager"
+ print "Installation failed due to security reasons."
+ package_tarfile.close()
+ os.chdir(orig_working_dir)
+ shutil.rmtree(install_dir)
+ sys.exit(0)
+
# Generate symbolic link from dep.name to dep.realname
# NOTE: Available only on Unix type systems!
if os.path.exists(dep.name):
os.remove(dep.name)
os.symlink(dep.realname, dep.name)
+ installed_files.append(os.path.join(dep_dir, dep.name))
+ package_tarfile.close()
+
# run ldconfig -n in the dep_dir
proc = subprocess.Popen(["ldconfig", "-n"])
proc.wait()
os.chdir(orig_working_dir)
- package_tarfile.close()
+ if not os.path.exists(os.path.join(install_dir, "share/gnunet-update/")):
+ os.makedirs(os.path.join(install_dir, "share/gnunet-update/"))
+ install_manifest_fd = open(os.path.join(install_dir,
+
"share/gnunet-update/install-manifest"),
+ "wb")
+ map((lambda name: install_manifest_fd.write(name + '\n')),
+ installed_files)
+ install_manifest_fd.close()
print "Installation Successful!"
print "GNUNET has been installed at: " + install_dir
Modified: gnunet-update/gnunet_update/package.py
===================================================================
--- gnunet-update/gnunet_update/package.py 2011-11-19 14:35:25 UTC (rev
18218)
+++ gnunet-update/gnunet_update/package.py 2011-11-19 18:52:25 UTC (rev
18219)
@@ -206,8 +206,9 @@
#generate the metadata file and add it to tar
metadata_file = metadata.write_to_file(package_file + ".meta")
#generate the metadata file signature
+ metadata_sig_file = metadata_file + ".asc"
metadata_fd = open(metadata_file, "rb")
- metadata_sig_fd = open(metadata_file + ".asc", "wb")
+ metadata_sig_fd = open(metadata_sig_file, "wb")
skey_passphrase = config.get('SECURITY', 'PGP_SIGN_KEY_PASSWORD')
if skey_passphrase is None:
# FIXME: Hide the characters while typing password
@@ -223,6 +224,10 @@
metadata_sig_fd.close()
tar_file.add(metadata_file, "metadata.dat")
+ tar_file.add(metadata_sig_file, "metadata.dat.asc")
+
+ os.remove(metadata_file)
+ os.remove(metadata_sig_file)
print "Here are the dependencies:"
for dep in dependencies:
Modified: gnunet-update/gnunet_update/util.py
===================================================================
--- gnunet-update/gnunet_update/util.py 2011-11-19 14:35:25 UTC (rev 18218)
+++ gnunet-update/gnunet_update/util.py 2011-11-19 18:52:25 UTC (rev 18219)
@@ -106,3 +106,27 @@
gpgme.SIG_MODE_CLEAR if detached is False
else gpgme.SIG_MODE_DETACH)
return new_sigs
+
+def gpg_verify_sign(plain_fd, sign_fd, key_fpr, detached=False):
+ """Verifys whether the signature is valid for the given input.
+
+ plain_fd: File like object pointing to the original file for which the
+ given signature is made if the signature is detached; in case of
+ undetached signatures it should point to an empty file into
+ which the original content is restored from the signature
+ sign_fd: File like object pointing to the signature
+ key_fpr: The fingerprint of the key that should be used for verifying
+ detached: If detached is false, then the original content is restored from
+ the signature in to the file pointed by plain_fd
+ """
+ ctx = gpgme.Context()
+ ctx.armor = True
+ key = ctx.get_key(key_fpr.replace(' ',''))
+ if detached is False:
+ new_sigs = ctx.verify(sign_fd, None, plain_fd)
+ else:
+ new_sigs = ctx.verify(sign_fd, plain_fd, None)
+
+
+
+ return new_sigs
Modified: gnunet-update/test/test_util.py
===================================================================
--- gnunet-update/test/test_util.py 2011-11-19 14:35:25 UTC (rev 18218)
+++ gnunet-update/test/test_util.py 2011-11-19 18:52:25 UTC (rev 18219)
@@ -150,13 +150,18 @@
# Now verify the signature
signature.seek(0)
plaintext = StringIO()
- sigs = ctx.verify(signature, None, plaintext)
+ # sigs = ctx.verify(signature, None, plaintext)
+ sigs = util.gpg_verify_sign(plaintext,
+ signature,
+ key_fpr,
+ detached=False)
self.assertEqual(plaintext.getvalue(), self.sample_test_data)
self.assertEqual(len(sigs),1)
self.assertEqual(sigs[0].fpr, key_fpr)
self.assertEqual(sigs[0].status, None)
self.assertEqual(sigs[0].wrong_key_usage, False)
- # Verify detached signature
+
+ # Verifying detached signature
plaintext.seek(0)
signature = StringIO()
sigs = util.gpg_sign_file(plaintext,
@@ -171,11 +176,26 @@
# Now verify the signature
signature.seek(0)
plaintext.seek(0)
- sigs = ctx.verify(signature, plaintext, None)
+ # sigs = ctx.verify(signature, plaintext, None)
+ sigs = util.gpg_verify_sign(plaintext,
+ signature,
+ key_fpr,
+ detached=True)
self.assertEqual(len(sigs),1)
self.assertEqual(sigs[0].fpr, key_fpr)
self.assertEqual(sigs[0].status, None)
self.assertEqual(sigs[0].wrong_key_usage, False)
+
+ # verify whether bad signature is caught
+ signature.seek(0)
+ plaintext = StringIO("This text should raise an error")
+ sigs = util.gpg_verify_sign(plaintext,
+ signature,
+ key_fpr,
+ detached=True)
+ self.assertEqual(len(sigs),1)
+ self.assertEqual(sigs[0].status[2], 'Bad signature')
+ self.assertEqual(sigs[0].wrong_key_usage, False)
shutil.rmtree(temp_gpghome);
if __name__ == '__main__':
[Prev in Thread] |
Current Thread |
[Next in Thread] |
- [GNUnet-SVN] r18219 - in gnunet-update: gnunet_update test,
gnunet <=