方法1:使用remote_api的
如何:寫一個bulkloader.yaml文件,直接使用「appcfg.py upload_data」從終端 命令,我不建議運行這種方法有以下幾個原因:1.巨大的延遲2.不支持NDB
方法2:GCS和使用mapreduce
上傳數據文件GCS:
使用「storage-file-transfer-json-python」的GitHub項目(chunked_transfer.py)從您的本地系統文件上傳到GCS。 確保從應用引擎管理控制檯生成適當的「client-secrets.json」文件。
的MapReduce:
使用 「appengine-mapreduce」 github上的項目。將「mapreduce」文件夾複製到您的項目頂層文件夾中。
下面的行添加到您的app.yaml文件:
includes:
- mapreduce/include.yaml
下面是你的main.py文件
import cgi
import webapp2
import logging
import os, csv
from models import DataStoreModel
import StringIO
from google.appengine.api import app_identity
from mapreduce import base_handler
from mapreduce import mapreduce_pipeline
from mapreduce import operation as op
from mapreduce.input_readers import InputReader
def testmapperFunc(newRequest):
f = StringIO.StringIO(newRequest)
reader = csv.reader(f, delimiter=',')
for row in reader:
newEntry = DataStoreModel(attr1=row[0], link=row[1])
yield op.db.Put(newEntry)
class TestGCSReaderPipeline(base_handler.PipelineBase):
def run(self, filename):
yield mapreduce_pipeline.MapreducePipeline(
"test_gcs",
"testgcs.testmapperFunc",
"mapreduce.input_readers.FileInputReader",
mapper_params={
"files": [filename],
"format": 'lines'
},
shards=1)
class tempTestRequestGCSUpload(webapp2.RequestHandler):
def get(self):
bucket_name = os.environ.get('BUCKET_NAME',
app_identity.get_default_gcs_bucket_name())
bucket = '/gs/' + bucket_name
filename = bucket + '/' + 'tempfile.csv'
pipeline = TestGCSReaderPipeline(filename)
pipeline.with_params(target="mapreducetestmodtest")
pipeline.start()
self.response.out.write('done')
application = webapp2.WSGIApplication([
('/gcsupload', tempTestRequestGCSUpload),
], debug=True)
要記住:
- MapReduce的項目用途現已被棄用的「Google Cloud Storage Files API」。所以未來的支持是不能保證的。
- 映射reduce爲數據存儲讀取和寫入增加了一個小開銷。
方法3:GCS和GCS客戶端庫
- 載CSV /文本文件以使用上述文件傳輸方法GCS。
- 使用gcs客戶端庫(將'cloudstorage'文件夾複製到您的應用程序頂層文件夾中)。
將以下代碼添加到應用程序main.py文件中。
import cgi
import webapp2
import logging
import jinja2
import os, csv
import cloudstorage as gcs
from google.appengine.ext import ndb
from google.appengine.api import app_identity
from models import DataStoreModel
class UploadGCSData(webapp2.RequestHandler):
def get(self):
bucket_name = os.environ.get('BUCKET_NAME',
app_identity.get_default_gcs_bucket_name())
bucket = '/' + bucket_name
filename = bucket + '/tempfile.csv'
self.upload_file(filename)
def upload_file(self, filename):
gcs_file = gcs.open(filename)
datareader = csv.reader(gcs_file)
count = 0
entities = []
for row in datareader:
count += 1
newProd = DataStoreModel(attr1=row[0], link=row[1])
entities.append(newProd)
if count%50==0 and entities:
ndb.put_multi(entities)
entities=[]
if entities:
ndb.put_multi(entities)
application = webapp2.WSGIApplication([
('/gcsupload', UploadGCSData),
], debug=True)
方法3將超時處理大量數據,除非您拆分BIG CSV。 – 2015-03-12 18:54:45
@ sh4dydud3_88是的。這就是爲什麼我更喜歡大量數據的mapreduce方法的原因。另外,將CSV分割成每個CSV 20k個實體的塊也可以很好地工作。 – Sriram 2015-03-17 08:55:47
當'gsutil'使得它像終端中的'cp'命令一樣簡單時,使用python腳本將文件上傳到GCS似乎有點愚蠢。 – the0ther 2017-02-23 15:04:07