Objectify transactions and multithreaded tasks not working as expected in local dev-server
I am facing a problem when using objectify transactions and GAE tasks on the local app-engine dev-server.
What I want to do is to process a huge csv file (parsing data in and writing it somewhere else) in a parallel manner. To do so, a first servlet takes care of:
- Store a Job entity on the Datastore, that identifies the entire work unit and takes track of total chunks and processed chunks.
- Split the big csv into smaller pieces. For each one, stores a Chunk entity on the Datastore having the Job as its parent (using @parent annotation)
- Once the splitting completes, the servlet launches as many tasks as the chunks that have been created.
Every task takes care of processing its smaller csv chunk object.
Now, it comes the problem. In order to identify when all the chunks have been correctly processed, I want the last one to spawn another task, that finalizes some work.
To do so, the idea is to use a transaction in order to ensure that the every completed chunk processing increments a counter on the Job instance, so that the last one can launch the final task.
However, this doesn't work. I've tried with a Job divided in 7 chunks and it seems that the last two chunks are processed concurrently (which is good!), but both of them succeed (with different timings). As you can imagine, both tasks start executing when the number of processed chunks is 5, and both try to set 6 as processed chunks. What I expect to happen is that, when exiting the transaction, one of them fails, and ony one should be able to update the processedChunks value on the parent object. So, the next time it is retried, the chunk count starts from 6 and not from 5.
So, for your convenience, here it is the piece of code that is executed.
// import static com.googlecode.objectify.ObjectifyService.ofy;
// ...
// ...
// Perform the data manipulation on the Chunk
// ...
// Load the parent job from Datastore
// job = ...
// Check if there still are valid chunks to process. If not, we are done!
ImportJob fctUpdatedJob = ofy().transact(() -> {
long threadId = Thread.currentThread().getId();
log.info("T{}: Transaction started", threadId);
ImportJob updatedJob = ofy().load().key(Key.create(ImportJob.class, job.getId())).now();
log.info("T{}: loaded job {}", threadId, updatedJob);
int processedChunks = updatedJob.getProcessedChunks() + 1;
updatedJob.setProcessedChunks(processedChunks);
updatedJob.setTotalChunkSeconds(updatedJob.getTotalChunkSeconds() + seconds);
// TODO Double check this stop condition
if (processedChunks == updatedJob.getTotalChunks() && !updatedJob.isDisposing()) {
updatedJob.setDisposing(true);
}
ofy().save().entity(updatedJob).now();
log.info("T{}: job saved. Job: {}", threadId, updatedJob);
return updatedJob;
});
if (job.getProcessedChunks()==job.getTotalChunks()) {
// We are done!
// Launch the final task
}
The following is the output (only the relevant part with Thread indications):
INFO: T18: Transaction started
INFO: T18: loaded job Job(year=2018, tempSpreadsheetId=1Yyf-4CufdK-34zQ3VAhaQLFUHfy0zj98OnHnj7wwYRE, tempSplitFileFolder=tmp_chunks/fct/2018/5578921999335424, nuvolaTempSpreadsheetId=null, nuvolaTemporaryTable=null, canceled=false, disposing=false, processedChunks=0, totalChunks=7, totalChunkSeconds=0)
INFO: T18: job saved. Job: Job(year=2018, tempSpreadsheetId=1Yyf-4CufdK-34zQ3VAhaQLFUHfy0zj98OnHnj7wwYRE, tempSplitFileFolder=tmp_chunks/fct/2018/5578921999335424, nuvolaTempSpreadsheetId=null, nuvolaTemporaryTable=null, canceled=false, disposing=false, processedChunks=1, totalChunks=7, totalChunkSeconds=47)
INFO: Processed 1 of 7 chunks for job Job(year=2018, tempSpreadsheetId=1Yyf-4CufdK-34zQ3VAhaQLFUHfy0zj98OnHnj7wwYRE, tempSplitFileFolder=tmp_chunks/fct/2018/5578921999335424, nuvolaTempSpreadsheetId=null, nuvolaTemporaryTable=null, canceled=false, disposing=false, processedChunks=1, totalChunks=7, totalChunkSeconds=47)
INFO: T13: Transaction started
INFO: T13: loaded job Job(year=2018, tempSpreadsheetId=1Yyf-4CufdK-34zQ3VAhaQLFUHfy0zj98OnHnj7wwYRE, tempSplitFileFolder=tmp_chunks/fct/2018/5578921999335424, nuvolaTempSpreadsheetId=null, nuvolaTemporaryTable=null, canceled=false, disposing=false, processedChunks=1, totalChunks=7, totalChunkSeconds=47)
INFO: T13: job saved. Job: Job(year=2018, tempSpreadsheetId=1Yyf-4CufdK-34zQ3VAhaQLFUHfy0zj98OnHnj7wwYRE, tempSplitFileFolder=tmp_chunks/fct/2018/5578921999335424, nuvolaTempSpreadsheetId=null, nuvolaTemporaryTable=null, canceled=false, disposing=false, processedChunks=2, totalChunks=7, totalChunkSeconds=98)
INFO: Processed 2 of 7 chunks for job Job(year=2018, tempSpreadsheetId=1Yyf-4CufdK-34zQ3VAhaQLFUHfy0zj98OnHnj7wwYRE, tempSplitFileFolder=tmp_chunks/fct/2018/5578921999335424, nuvolaTempSpreadsheetId=null, nuvolaTemporaryTable=null, canceled=false, disposing=false, processedChunks=2, totalChunks=7, totalChunkSeconds=98)
INFO: T17: Transaction started
INFO: T17: loaded job Job(year=2018, tempSpreadsheetId=1px7iKzxlEwr2hczF6NM9m-1HyRMYJ4GhZ20Ss4zCzqA, tempSplitFileFolder=tmp_chunks/fct/2018/5578921999335424, nuvolaTempSpreadsheetId=null, nuvolaTemporaryTable=null, canceled=false, disposing=false, processedChunks=2, totalChunks=7, totalChunkSeconds=98)
INFO: T17: job saved. Job: Job(year=2018, tempSpreadsheetId=1px7iKzxlEwr2hczF6NM9m-1HyRMYJ4GhZ20Ss4zCzqA, tempSplitFileFolder=tmp_chunks/fct/2018/5578921999335424, nuvolaTempSpreadsheetId=null, nuvolaTemporaryTable=null, canceled=false, disposing=false, processedChunks=3, totalChunks=7, totalChunkSeconds=151)
INFO: Processed 3 of 7 chunks for job Job(year=2018, tempSpreadsheetId=1px7iKzxlEwr2hczF6NM9m-1HyRMYJ4GhZ20Ss4zCzqA, tempSplitFileFolder=tmp_chunks/fct/2018/5578921999335424, nuvolaTempSpreadsheetId=null, nuvolaTemporaryTable=null, canceled=false, disposing=false, processedChunks=3, totalChunks=7, totalChunkSeconds=151)
INFO: T14: Transaction started
INFO: T14: loaded job Job(year=2018, tempSpreadsheetId=1px7iKzxlEwr2hczF6NM9m-1HyRMYJ4GhZ20Ss4zCzqA, tempSplitFileFolder=tmp_chunks/fct/2018/5578921999335424, nuvolaTempSpreadsheetId=null, nuvolaTemporaryTable=null, canceled=false, disposing=false, processedChunks=3, totalChunks=7, totalChunkSeconds=151)
INFO: T14: job saved. Job: Job(year=2018, tempSpreadsheetId=1px7iKzxlEwr2hczF6NM9m-1HyRMYJ4GhZ20Ss4zCzqA, tempSplitFileFolder=tmp_chunks/fct/2018/5578921999335424, nuvolaTempSpreadsheetId=null, nuvolaTemporaryTable=null, canceled=false, disposing=false, processedChunks=4, totalChunks=7, totalChunkSeconds=203)
INFO: Processed 4 of 7 chunks for job Job(year=2018, tempSpreadsheetId=1px7iKzxlEwr2hczF6NM9m-1HyRMYJ4GhZ20Ss4zCzqA, tempSplitFileFolder=tmp_chunks/fct/2018/5578921999335424, nuvolaTempSpreadsheetId=null, nuvolaTemporaryTable=null, canceled=false, disposing=false, processedChunks=4, totalChunks=7, totalChunkSeconds=203)
INFO: T19: Transaction started
INFO: T19: loaded job Job(year=2018, tempSpreadsheetId=1tqa8inL6H94I5Wi9z6UWCoy4C8XDDxltod7afLhCU6o, tempSplitFileFolder=tmp_chunks/fct/2018/5578921999335424, nuvolaTempSpreadsheetId=null, nuvolaTemporaryTable=null, canceled=false, disposing=false, processedChunks=4, totalChunks=7, totalChunkSeconds=203)
INFO: T19: job saved. Job: Job(year=2018, tempSpreadsheetId=1tqa8inL6H94I5Wi9z6UWCoy4C8XDDxltod7afLhCU6o, tempSplitFileFolder=tmp_chunks/fct/2018/5578921999335424, nuvolaTempSpreadsheetId=null, nuvolaTemporaryTable=null, canceled=false, disposing=false, processedChunks=5, totalChunks=7, totalChunkSeconds=250)
INFO: Processed 5 of 7 chunks for job Job(year=2018, tempSpreadsheetId=1tqa8inL6H94I5Wi9z6UWCoy4C8XDDxltod7afLhCU6o, tempSplitFileFolder=tmp_chunks/fct/2018/5578921999335424, nuvolaTempSpreadsheetId=null, nuvolaTemporaryTable=null, canceled=false, disposing=false, processedChunks=5, totalChunks=7, totalChunkSeconds=250)
INFO: T22: Transaction started
INFO: T22: loaded job Job(year=2018, tempSpreadsheetId=1tqa8inL6H94I5Wi9z6UWCoy4C8XDDxltod7afLhCU6o, tempSplitFileFolder=tmp_chunks/fct/2018/5578921999335424, nuvolaTempSpreadsheetId=null, nuvolaTemporaryTable=null, canceled=false, disposing=false, processedChunks=5, totalChunks=7, totalChunkSeconds=250)
INFO: T22: job saved. Job: Job(year=2018, tempSpreadsheetId=1tqa8inL6H94I5Wi9z6UWCoy4C8XDDxltod7afLhCU6o, tempSplitFileFolder=tmp_chunks/fct/2018/5578921999335424, nuvolaTempSpreadsheetId=null, nuvolaTemporaryTable=null, canceled=false, disposing=false, processedChunks=6, totalChunks=7, totalChunkSeconds=319)
INFO: Processed 6 of 7 chunks for job Job(year=2018, tempSpreadsheetId=1tqa8inL6H94I5Wi9z6UWCoy4C8XDDxltod7afLhCU6o, tempSplitFileFolder=tmp_chunks/fct/2018/5578921999335424, nuvolaTempSpreadsheetId=null, nuvolaTemporaryTable=null, canceled=false, disposing=false, processedChunks=6, totalChunks=7, totalChunkSeconds=319)
INFO: T20: Transaction started
INFO: T20: loaded job Job(year=2018, tempSpreadsheetId=1a924P9sXQYXV1Arrv7ak0FLBb2sRcclX6fqNQIFw8m0, tempSplitFileFolder=tmp_chunks/fct/2018/5578921999335424, nuvolaTempSpreadsheetId=null, nuvolaTemporaryTable=null, canceled=false, disposing=false, processedChunks=5, totalChunks=7, totalChunkSeconds=250)
INFO: T20: job saved. Job: Job(year=2018, tempSpreadsheetId=1a924P9sXQYXV1Arrv7ak0FLBb2sRcclX6fqNQIFw8m0, tempSplitFileFolder=tmp_chunks/fct/2018/5578921999335424, nuvolaTempSpreadsheetId=null, nuvolaTemporaryTable=null, canceled=false, disposing=false, processedChunks=6, totalChunks=7, totalChunkSeconds=304)
INFO: Processed 6 of 7 chunks for job Job(year=2018, tempSpreadsheetId=1a924P9sXQYXV1Arrv7ak0FLBb2sRcclX6fqNQIFw8m0, tempSplitFileFolder=tmp_chunks/fct/2018/5578921999335424, nuvolaTempSpreadsheetId=null, nuvolaTemporaryTable=null, canceled=false, disposing=false, processedChunks=6, totalChunks=7, totalChunkSeconds=304)
As you can see, T22 starts with 5 processed chunks, saves it to 6 but then T20 starts again from 5 and saves it to 6. Thus, the condition processedChunks == totalChunks never occurs.
I am using objectify version 5.1.6, on the local AppEngine dev server and in my appengine-web.xml I have enabled the option.
What am I missing here?
java google-app-engine google-cloud-datastore objectify
add a comment |
I am facing a problem when using objectify transactions and GAE tasks on the local app-engine dev-server.
What I want to do is to process a huge csv file (parsing data in and writing it somewhere else) in a parallel manner. To do so, a first servlet takes care of:
- Store a Job entity on the Datastore, that identifies the entire work unit and takes track of total chunks and processed chunks.
- Split the big csv into smaller pieces. For each one, stores a Chunk entity on the Datastore having the Job as its parent (using @parent annotation)
- Once the splitting completes, the servlet launches as many tasks as the chunks that have been created.
Every task takes care of processing its smaller csv chunk object.
Now, it comes the problem. In order to identify when all the chunks have been correctly processed, I want the last one to spawn another task, that finalizes some work.
To do so, the idea is to use a transaction in order to ensure that the every completed chunk processing increments a counter on the Job instance, so that the last one can launch the final task.
However, this doesn't work. I've tried with a Job divided in 7 chunks and it seems that the last two chunks are processed concurrently (which is good!), but both of them succeed (with different timings). As you can imagine, both tasks start executing when the number of processed chunks is 5, and both try to set 6 as processed chunks. What I expect to happen is that, when exiting the transaction, one of them fails, and ony one should be able to update the processedChunks value on the parent object. So, the next time it is retried, the chunk count starts from 6 and not from 5.
So, for your convenience, here it is the piece of code that is executed.
// import static com.googlecode.objectify.ObjectifyService.ofy;
// ...
// ...
// Perform the data manipulation on the Chunk
// ...
// Load the parent job from Datastore
// job = ...
// Check if there still are valid chunks to process. If not, we are done!
ImportJob fctUpdatedJob = ofy().transact(() -> {
long threadId = Thread.currentThread().getId();
log.info("T{}: Transaction started", threadId);
ImportJob updatedJob = ofy().load().key(Key.create(ImportJob.class, job.getId())).now();
log.info("T{}: loaded job {}", threadId, updatedJob);
int processedChunks = updatedJob.getProcessedChunks() + 1;
updatedJob.setProcessedChunks(processedChunks);
updatedJob.setTotalChunkSeconds(updatedJob.getTotalChunkSeconds() + seconds);
// TODO Double check this stop condition
if (processedChunks == updatedJob.getTotalChunks() && !updatedJob.isDisposing()) {
updatedJob.setDisposing(true);
}
ofy().save().entity(updatedJob).now();
log.info("T{}: job saved. Job: {}", threadId, updatedJob);
return updatedJob;
});
if (job.getProcessedChunks()==job.getTotalChunks()) {
// We are done!
// Launch the final task
}
The following is the output (only the relevant part with Thread indications):
INFO: T18: Transaction started
INFO: T18: loaded job Job(year=2018, tempSpreadsheetId=1Yyf-4CufdK-34zQ3VAhaQLFUHfy0zj98OnHnj7wwYRE, tempSplitFileFolder=tmp_chunks/fct/2018/5578921999335424, nuvolaTempSpreadsheetId=null, nuvolaTemporaryTable=null, canceled=false, disposing=false, processedChunks=0, totalChunks=7, totalChunkSeconds=0)
INFO: T18: job saved. Job: Job(year=2018, tempSpreadsheetId=1Yyf-4CufdK-34zQ3VAhaQLFUHfy0zj98OnHnj7wwYRE, tempSplitFileFolder=tmp_chunks/fct/2018/5578921999335424, nuvolaTempSpreadsheetId=null, nuvolaTemporaryTable=null, canceled=false, disposing=false, processedChunks=1, totalChunks=7, totalChunkSeconds=47)
INFO: Processed 1 of 7 chunks for job Job(year=2018, tempSpreadsheetId=1Yyf-4CufdK-34zQ3VAhaQLFUHfy0zj98OnHnj7wwYRE, tempSplitFileFolder=tmp_chunks/fct/2018/5578921999335424, nuvolaTempSpreadsheetId=null, nuvolaTemporaryTable=null, canceled=false, disposing=false, processedChunks=1, totalChunks=7, totalChunkSeconds=47)
INFO: T13: Transaction started
INFO: T13: loaded job Job(year=2018, tempSpreadsheetId=1Yyf-4CufdK-34zQ3VAhaQLFUHfy0zj98OnHnj7wwYRE, tempSplitFileFolder=tmp_chunks/fct/2018/5578921999335424, nuvolaTempSpreadsheetId=null, nuvolaTemporaryTable=null, canceled=false, disposing=false, processedChunks=1, totalChunks=7, totalChunkSeconds=47)
INFO: T13: job saved. Job: Job(year=2018, tempSpreadsheetId=1Yyf-4CufdK-34zQ3VAhaQLFUHfy0zj98OnHnj7wwYRE, tempSplitFileFolder=tmp_chunks/fct/2018/5578921999335424, nuvolaTempSpreadsheetId=null, nuvolaTemporaryTable=null, canceled=false, disposing=false, processedChunks=2, totalChunks=7, totalChunkSeconds=98)
INFO: Processed 2 of 7 chunks for job Job(year=2018, tempSpreadsheetId=1Yyf-4CufdK-34zQ3VAhaQLFUHfy0zj98OnHnj7wwYRE, tempSplitFileFolder=tmp_chunks/fct/2018/5578921999335424, nuvolaTempSpreadsheetId=null, nuvolaTemporaryTable=null, canceled=false, disposing=false, processedChunks=2, totalChunks=7, totalChunkSeconds=98)
INFO: T17: Transaction started
INFO: T17: loaded job Job(year=2018, tempSpreadsheetId=1px7iKzxlEwr2hczF6NM9m-1HyRMYJ4GhZ20Ss4zCzqA, tempSplitFileFolder=tmp_chunks/fct/2018/5578921999335424, nuvolaTempSpreadsheetId=null, nuvolaTemporaryTable=null, canceled=false, disposing=false, processedChunks=2, totalChunks=7, totalChunkSeconds=98)
INFO: T17: job saved. Job: Job(year=2018, tempSpreadsheetId=1px7iKzxlEwr2hczF6NM9m-1HyRMYJ4GhZ20Ss4zCzqA, tempSplitFileFolder=tmp_chunks/fct/2018/5578921999335424, nuvolaTempSpreadsheetId=null, nuvolaTemporaryTable=null, canceled=false, disposing=false, processedChunks=3, totalChunks=7, totalChunkSeconds=151)
INFO: Processed 3 of 7 chunks for job Job(year=2018, tempSpreadsheetId=1px7iKzxlEwr2hczF6NM9m-1HyRMYJ4GhZ20Ss4zCzqA, tempSplitFileFolder=tmp_chunks/fct/2018/5578921999335424, nuvolaTempSpreadsheetId=null, nuvolaTemporaryTable=null, canceled=false, disposing=false, processedChunks=3, totalChunks=7, totalChunkSeconds=151)
INFO: T14: Transaction started
INFO: T14: loaded job Job(year=2018, tempSpreadsheetId=1px7iKzxlEwr2hczF6NM9m-1HyRMYJ4GhZ20Ss4zCzqA, tempSplitFileFolder=tmp_chunks/fct/2018/5578921999335424, nuvolaTempSpreadsheetId=null, nuvolaTemporaryTable=null, canceled=false, disposing=false, processedChunks=3, totalChunks=7, totalChunkSeconds=151)
INFO: T14: job saved. Job: Job(year=2018, tempSpreadsheetId=1px7iKzxlEwr2hczF6NM9m-1HyRMYJ4GhZ20Ss4zCzqA, tempSplitFileFolder=tmp_chunks/fct/2018/5578921999335424, nuvolaTempSpreadsheetId=null, nuvolaTemporaryTable=null, canceled=false, disposing=false, processedChunks=4, totalChunks=7, totalChunkSeconds=203)
INFO: Processed 4 of 7 chunks for job Job(year=2018, tempSpreadsheetId=1px7iKzxlEwr2hczF6NM9m-1HyRMYJ4GhZ20Ss4zCzqA, tempSplitFileFolder=tmp_chunks/fct/2018/5578921999335424, nuvolaTempSpreadsheetId=null, nuvolaTemporaryTable=null, canceled=false, disposing=false, processedChunks=4, totalChunks=7, totalChunkSeconds=203)
INFO: T19: Transaction started
INFO: T19: loaded job Job(year=2018, tempSpreadsheetId=1tqa8inL6H94I5Wi9z6UWCoy4C8XDDxltod7afLhCU6o, tempSplitFileFolder=tmp_chunks/fct/2018/5578921999335424, nuvolaTempSpreadsheetId=null, nuvolaTemporaryTable=null, canceled=false, disposing=false, processedChunks=4, totalChunks=7, totalChunkSeconds=203)
INFO: T19: job saved. Job: Job(year=2018, tempSpreadsheetId=1tqa8inL6H94I5Wi9z6UWCoy4C8XDDxltod7afLhCU6o, tempSplitFileFolder=tmp_chunks/fct/2018/5578921999335424, nuvolaTempSpreadsheetId=null, nuvolaTemporaryTable=null, canceled=false, disposing=false, processedChunks=5, totalChunks=7, totalChunkSeconds=250)
INFO: Processed 5 of 7 chunks for job Job(year=2018, tempSpreadsheetId=1tqa8inL6H94I5Wi9z6UWCoy4C8XDDxltod7afLhCU6o, tempSplitFileFolder=tmp_chunks/fct/2018/5578921999335424, nuvolaTempSpreadsheetId=null, nuvolaTemporaryTable=null, canceled=false, disposing=false, processedChunks=5, totalChunks=7, totalChunkSeconds=250)
INFO: T22: Transaction started
INFO: T22: loaded job Job(year=2018, tempSpreadsheetId=1tqa8inL6H94I5Wi9z6UWCoy4C8XDDxltod7afLhCU6o, tempSplitFileFolder=tmp_chunks/fct/2018/5578921999335424, nuvolaTempSpreadsheetId=null, nuvolaTemporaryTable=null, canceled=false, disposing=false, processedChunks=5, totalChunks=7, totalChunkSeconds=250)
INFO: T22: job saved. Job: Job(year=2018, tempSpreadsheetId=1tqa8inL6H94I5Wi9z6UWCoy4C8XDDxltod7afLhCU6o, tempSplitFileFolder=tmp_chunks/fct/2018/5578921999335424, nuvolaTempSpreadsheetId=null, nuvolaTemporaryTable=null, canceled=false, disposing=false, processedChunks=6, totalChunks=7, totalChunkSeconds=319)
INFO: Processed 6 of 7 chunks for job Job(year=2018, tempSpreadsheetId=1tqa8inL6H94I5Wi9z6UWCoy4C8XDDxltod7afLhCU6o, tempSplitFileFolder=tmp_chunks/fct/2018/5578921999335424, nuvolaTempSpreadsheetId=null, nuvolaTemporaryTable=null, canceled=false, disposing=false, processedChunks=6, totalChunks=7, totalChunkSeconds=319)
INFO: T20: Transaction started
INFO: T20: loaded job Job(year=2018, tempSpreadsheetId=1a924P9sXQYXV1Arrv7ak0FLBb2sRcclX6fqNQIFw8m0, tempSplitFileFolder=tmp_chunks/fct/2018/5578921999335424, nuvolaTempSpreadsheetId=null, nuvolaTemporaryTable=null, canceled=false, disposing=false, processedChunks=5, totalChunks=7, totalChunkSeconds=250)
INFO: T20: job saved. Job: Job(year=2018, tempSpreadsheetId=1a924P9sXQYXV1Arrv7ak0FLBb2sRcclX6fqNQIFw8m0, tempSplitFileFolder=tmp_chunks/fct/2018/5578921999335424, nuvolaTempSpreadsheetId=null, nuvolaTemporaryTable=null, canceled=false, disposing=false, processedChunks=6, totalChunks=7, totalChunkSeconds=304)
INFO: Processed 6 of 7 chunks for job Job(year=2018, tempSpreadsheetId=1a924P9sXQYXV1Arrv7ak0FLBb2sRcclX6fqNQIFw8m0, tempSplitFileFolder=tmp_chunks/fct/2018/5578921999335424, nuvolaTempSpreadsheetId=null, nuvolaTemporaryTable=null, canceled=false, disposing=false, processedChunks=6, totalChunks=7, totalChunkSeconds=304)
As you can see, T22 starts with 5 processed chunks, saves it to 6 but then T20 starts again from 5 and saves it to 6. Thus, the condition processedChunks == totalChunks never occurs.
I am using objectify version 5.1.6, on the local AppEngine dev server and in my appengine-web.xml I have enabled the option.
What am I missing here?
java google-app-engine google-cloud-datastore objectify
Your transactions are not idempotent (github.com/objectify/objectify/wiki/Transactions#idempotence), just simple incrementing wont work everytime. Do you cache your ImportJob entity? If so, you may want to turn it offofy().cache(false).load()
– Tijkijiki
Dec 28 '18 at 14:23
add a comment |
I am facing a problem when using objectify transactions and GAE tasks on the local app-engine dev-server.
What I want to do is to process a huge csv file (parsing data in and writing it somewhere else) in a parallel manner. To do so, a first servlet takes care of:
- Store a Job entity on the Datastore, that identifies the entire work unit and takes track of total chunks and processed chunks.
- Split the big csv into smaller pieces. For each one, stores a Chunk entity on the Datastore having the Job as its parent (using @parent annotation)
- Once the splitting completes, the servlet launches as many tasks as the chunks that have been created.
Every task takes care of processing its smaller csv chunk object.
Now, it comes the problem. In order to identify when all the chunks have been correctly processed, I want the last one to spawn another task, that finalizes some work.
To do so, the idea is to use a transaction in order to ensure that the every completed chunk processing increments a counter on the Job instance, so that the last one can launch the final task.
However, this doesn't work. I've tried with a Job divided in 7 chunks and it seems that the last two chunks are processed concurrently (which is good!), but both of them succeed (with different timings). As you can imagine, both tasks start executing when the number of processed chunks is 5, and both try to set 6 as processed chunks. What I expect to happen is that, when exiting the transaction, one of them fails, and ony one should be able to update the processedChunks value on the parent object. So, the next time it is retried, the chunk count starts from 6 and not from 5.
So, for your convenience, here it is the piece of code that is executed.
// import static com.googlecode.objectify.ObjectifyService.ofy;
// ...
// ...
// Perform the data manipulation on the Chunk
// ...
// Load the parent job from Datastore
// job = ...
// Check if there still are valid chunks to process. If not, we are done!
ImportJob fctUpdatedJob = ofy().transact(() -> {
long threadId = Thread.currentThread().getId();
log.info("T{}: Transaction started", threadId);
ImportJob updatedJob = ofy().load().key(Key.create(ImportJob.class, job.getId())).now();
log.info("T{}: loaded job {}", threadId, updatedJob);
int processedChunks = updatedJob.getProcessedChunks() + 1;
updatedJob.setProcessedChunks(processedChunks);
updatedJob.setTotalChunkSeconds(updatedJob.getTotalChunkSeconds() + seconds);
// TODO Double check this stop condition
if (processedChunks == updatedJob.getTotalChunks() && !updatedJob.isDisposing()) {
updatedJob.setDisposing(true);
}
ofy().save().entity(updatedJob).now();
log.info("T{}: job saved. Job: {}", threadId, updatedJob);
return updatedJob;
});
if (job.getProcessedChunks()==job.getTotalChunks()) {
// We are done!
// Launch the final task
}
The following is the output (only the relevant part with Thread indications):
INFO: T18: Transaction started
INFO: T18: loaded job Job(year=2018, tempSpreadsheetId=1Yyf-4CufdK-34zQ3VAhaQLFUHfy0zj98OnHnj7wwYRE, tempSplitFileFolder=tmp_chunks/fct/2018/5578921999335424, nuvolaTempSpreadsheetId=null, nuvolaTemporaryTable=null, canceled=false, disposing=false, processedChunks=0, totalChunks=7, totalChunkSeconds=0)
INFO: T18: job saved. Job: Job(year=2018, tempSpreadsheetId=1Yyf-4CufdK-34zQ3VAhaQLFUHfy0zj98OnHnj7wwYRE, tempSplitFileFolder=tmp_chunks/fct/2018/5578921999335424, nuvolaTempSpreadsheetId=null, nuvolaTemporaryTable=null, canceled=false, disposing=false, processedChunks=1, totalChunks=7, totalChunkSeconds=47)
INFO: Processed 1 of 7 chunks for job Job(year=2018, tempSpreadsheetId=1Yyf-4CufdK-34zQ3VAhaQLFUHfy0zj98OnHnj7wwYRE, tempSplitFileFolder=tmp_chunks/fct/2018/5578921999335424, nuvolaTempSpreadsheetId=null, nuvolaTemporaryTable=null, canceled=false, disposing=false, processedChunks=1, totalChunks=7, totalChunkSeconds=47)
INFO: T13: Transaction started
INFO: T13: loaded job Job(year=2018, tempSpreadsheetId=1Yyf-4CufdK-34zQ3VAhaQLFUHfy0zj98OnHnj7wwYRE, tempSplitFileFolder=tmp_chunks/fct/2018/5578921999335424, nuvolaTempSpreadsheetId=null, nuvolaTemporaryTable=null, canceled=false, disposing=false, processedChunks=1, totalChunks=7, totalChunkSeconds=47)
INFO: T13: job saved. Job: Job(year=2018, tempSpreadsheetId=1Yyf-4CufdK-34zQ3VAhaQLFUHfy0zj98OnHnj7wwYRE, tempSplitFileFolder=tmp_chunks/fct/2018/5578921999335424, nuvolaTempSpreadsheetId=null, nuvolaTemporaryTable=null, canceled=false, disposing=false, processedChunks=2, totalChunks=7, totalChunkSeconds=98)
INFO: Processed 2 of 7 chunks for job Job(year=2018, tempSpreadsheetId=1Yyf-4CufdK-34zQ3VAhaQLFUHfy0zj98OnHnj7wwYRE, tempSplitFileFolder=tmp_chunks/fct/2018/5578921999335424, nuvolaTempSpreadsheetId=null, nuvolaTemporaryTable=null, canceled=false, disposing=false, processedChunks=2, totalChunks=7, totalChunkSeconds=98)
INFO: T17: Transaction started
INFO: T17: loaded job Job(year=2018, tempSpreadsheetId=1px7iKzxlEwr2hczF6NM9m-1HyRMYJ4GhZ20Ss4zCzqA, tempSplitFileFolder=tmp_chunks/fct/2018/5578921999335424, nuvolaTempSpreadsheetId=null, nuvolaTemporaryTable=null, canceled=false, disposing=false, processedChunks=2, totalChunks=7, totalChunkSeconds=98)
INFO: T17: job saved. Job: Job(year=2018, tempSpreadsheetId=1px7iKzxlEwr2hczF6NM9m-1HyRMYJ4GhZ20Ss4zCzqA, tempSplitFileFolder=tmp_chunks/fct/2018/5578921999335424, nuvolaTempSpreadsheetId=null, nuvolaTemporaryTable=null, canceled=false, disposing=false, processedChunks=3, totalChunks=7, totalChunkSeconds=151)
INFO: Processed 3 of 7 chunks for job Job(year=2018, tempSpreadsheetId=1px7iKzxlEwr2hczF6NM9m-1HyRMYJ4GhZ20Ss4zCzqA, tempSplitFileFolder=tmp_chunks/fct/2018/5578921999335424, nuvolaTempSpreadsheetId=null, nuvolaTemporaryTable=null, canceled=false, disposing=false, processedChunks=3, totalChunks=7, totalChunkSeconds=151)
INFO: T14: Transaction started
INFO: T14: loaded job Job(year=2018, tempSpreadsheetId=1px7iKzxlEwr2hczF6NM9m-1HyRMYJ4GhZ20Ss4zCzqA, tempSplitFileFolder=tmp_chunks/fct/2018/5578921999335424, nuvolaTempSpreadsheetId=null, nuvolaTemporaryTable=null, canceled=false, disposing=false, processedChunks=3, totalChunks=7, totalChunkSeconds=151)
INFO: T14: job saved. Job: Job(year=2018, tempSpreadsheetId=1px7iKzxlEwr2hczF6NM9m-1HyRMYJ4GhZ20Ss4zCzqA, tempSplitFileFolder=tmp_chunks/fct/2018/5578921999335424, nuvolaTempSpreadsheetId=null, nuvolaTemporaryTable=null, canceled=false, disposing=false, processedChunks=4, totalChunks=7, totalChunkSeconds=203)
INFO: Processed 4 of 7 chunks for job Job(year=2018, tempSpreadsheetId=1px7iKzxlEwr2hczF6NM9m-1HyRMYJ4GhZ20Ss4zCzqA, tempSplitFileFolder=tmp_chunks/fct/2018/5578921999335424, nuvolaTempSpreadsheetId=null, nuvolaTemporaryTable=null, canceled=false, disposing=false, processedChunks=4, totalChunks=7, totalChunkSeconds=203)
INFO: T19: Transaction started
INFO: T19: loaded job Job(year=2018, tempSpreadsheetId=1tqa8inL6H94I5Wi9z6UWCoy4C8XDDxltod7afLhCU6o, tempSplitFileFolder=tmp_chunks/fct/2018/5578921999335424, nuvolaTempSpreadsheetId=null, nuvolaTemporaryTable=null, canceled=false, disposing=false, processedChunks=4, totalChunks=7, totalChunkSeconds=203)
INFO: T19: job saved. Job: Job(year=2018, tempSpreadsheetId=1tqa8inL6H94I5Wi9z6UWCoy4C8XDDxltod7afLhCU6o, tempSplitFileFolder=tmp_chunks/fct/2018/5578921999335424, nuvolaTempSpreadsheetId=null, nuvolaTemporaryTable=null, canceled=false, disposing=false, processedChunks=5, totalChunks=7, totalChunkSeconds=250)
INFO: Processed 5 of 7 chunks for job Job(year=2018, tempSpreadsheetId=1tqa8inL6H94I5Wi9z6UWCoy4C8XDDxltod7afLhCU6o, tempSplitFileFolder=tmp_chunks/fct/2018/5578921999335424, nuvolaTempSpreadsheetId=null, nuvolaTemporaryTable=null, canceled=false, disposing=false, processedChunks=5, totalChunks=7, totalChunkSeconds=250)
INFO: T22: Transaction started
INFO: T22: loaded job Job(year=2018, tempSpreadsheetId=1tqa8inL6H94I5Wi9z6UWCoy4C8XDDxltod7afLhCU6o, tempSplitFileFolder=tmp_chunks/fct/2018/5578921999335424, nuvolaTempSpreadsheetId=null, nuvolaTemporaryTable=null, canceled=false, disposing=false, processedChunks=5, totalChunks=7, totalChunkSeconds=250)
INFO: T22: job saved. Job: Job(year=2018, tempSpreadsheetId=1tqa8inL6H94I5Wi9z6UWCoy4C8XDDxltod7afLhCU6o, tempSplitFileFolder=tmp_chunks/fct/2018/5578921999335424, nuvolaTempSpreadsheetId=null, nuvolaTemporaryTable=null, canceled=false, disposing=false, processedChunks=6, totalChunks=7, totalChunkSeconds=319)
INFO: Processed 6 of 7 chunks for job Job(year=2018, tempSpreadsheetId=1tqa8inL6H94I5Wi9z6UWCoy4C8XDDxltod7afLhCU6o, tempSplitFileFolder=tmp_chunks/fct/2018/5578921999335424, nuvolaTempSpreadsheetId=null, nuvolaTemporaryTable=null, canceled=false, disposing=false, processedChunks=6, totalChunks=7, totalChunkSeconds=319)
INFO: T20: Transaction started
INFO: T20: loaded job Job(year=2018, tempSpreadsheetId=1a924P9sXQYXV1Arrv7ak0FLBb2sRcclX6fqNQIFw8m0, tempSplitFileFolder=tmp_chunks/fct/2018/5578921999335424, nuvolaTempSpreadsheetId=null, nuvolaTemporaryTable=null, canceled=false, disposing=false, processedChunks=5, totalChunks=7, totalChunkSeconds=250)
INFO: T20: job saved. Job: Job(year=2018, tempSpreadsheetId=1a924P9sXQYXV1Arrv7ak0FLBb2sRcclX6fqNQIFw8m0, tempSplitFileFolder=tmp_chunks/fct/2018/5578921999335424, nuvolaTempSpreadsheetId=null, nuvolaTemporaryTable=null, canceled=false, disposing=false, processedChunks=6, totalChunks=7, totalChunkSeconds=304)
INFO: Processed 6 of 7 chunks for job Job(year=2018, tempSpreadsheetId=1a924P9sXQYXV1Arrv7ak0FLBb2sRcclX6fqNQIFw8m0, tempSplitFileFolder=tmp_chunks/fct/2018/5578921999335424, nuvolaTempSpreadsheetId=null, nuvolaTemporaryTable=null, canceled=false, disposing=false, processedChunks=6, totalChunks=7, totalChunkSeconds=304)
As you can see, T22 starts with 5 processed chunks, saves it to 6 but then T20 starts again from 5 and saves it to 6. Thus, the condition processedChunks == totalChunks never occurs.
I am using objectify version 5.1.6, on the local AppEngine dev server and in my appengine-web.xml I have enabled the option.
What am I missing here?
java google-app-engine google-cloud-datastore objectify
I am facing a problem when using objectify transactions and GAE tasks on the local app-engine dev-server.
What I want to do is to process a huge csv file (parsing data in and writing it somewhere else) in a parallel manner. To do so, a first servlet takes care of:
- Store a Job entity on the Datastore, that identifies the entire work unit and takes track of total chunks and processed chunks.
- Split the big csv into smaller pieces. For each one, stores a Chunk entity on the Datastore having the Job as its parent (using @parent annotation)
- Once the splitting completes, the servlet launches as many tasks as the chunks that have been created.
Every task takes care of processing its smaller csv chunk object.
Now, it comes the problem. In order to identify when all the chunks have been correctly processed, I want the last one to spawn another task, that finalizes some work.
To do so, the idea is to use a transaction in order to ensure that the every completed chunk processing increments a counter on the Job instance, so that the last one can launch the final task.
However, this doesn't work. I've tried with a Job divided in 7 chunks and it seems that the last two chunks are processed concurrently (which is good!), but both of them succeed (with different timings). As you can imagine, both tasks start executing when the number of processed chunks is 5, and both try to set 6 as processed chunks. What I expect to happen is that, when exiting the transaction, one of them fails, and ony one should be able to update the processedChunks value on the parent object. So, the next time it is retried, the chunk count starts from 6 and not from 5.
So, for your convenience, here it is the piece of code that is executed.
// import static com.googlecode.objectify.ObjectifyService.ofy;
// ...
// ...
// Perform the data manipulation on the Chunk
// ...
// Load the parent job from Datastore
// job = ...
// Check if there still are valid chunks to process. If not, we are done!
ImportJob fctUpdatedJob = ofy().transact(() -> {
long threadId = Thread.currentThread().getId();
log.info("T{}: Transaction started", threadId);
ImportJob updatedJob = ofy().load().key(Key.create(ImportJob.class, job.getId())).now();
log.info("T{}: loaded job {}", threadId, updatedJob);
int processedChunks = updatedJob.getProcessedChunks() + 1;
updatedJob.setProcessedChunks(processedChunks);
updatedJob.setTotalChunkSeconds(updatedJob.getTotalChunkSeconds() + seconds);
// TODO Double check this stop condition
if (processedChunks == updatedJob.getTotalChunks() && !updatedJob.isDisposing()) {
updatedJob.setDisposing(true);
}
ofy().save().entity(updatedJob).now();
log.info("T{}: job saved. Job: {}", threadId, updatedJob);
return updatedJob;
});
if (job.getProcessedChunks()==job.getTotalChunks()) {
// We are done!
// Launch the final task
}
The following is the output (only the relevant part with Thread indications):
INFO: T18: Transaction started
INFO: T18: loaded job Job(year=2018, tempSpreadsheetId=1Yyf-4CufdK-34zQ3VAhaQLFUHfy0zj98OnHnj7wwYRE, tempSplitFileFolder=tmp_chunks/fct/2018/5578921999335424, nuvolaTempSpreadsheetId=null, nuvolaTemporaryTable=null, canceled=false, disposing=false, processedChunks=0, totalChunks=7, totalChunkSeconds=0)
INFO: T18: job saved. Job: Job(year=2018, tempSpreadsheetId=1Yyf-4CufdK-34zQ3VAhaQLFUHfy0zj98OnHnj7wwYRE, tempSplitFileFolder=tmp_chunks/fct/2018/5578921999335424, nuvolaTempSpreadsheetId=null, nuvolaTemporaryTable=null, canceled=false, disposing=false, processedChunks=1, totalChunks=7, totalChunkSeconds=47)
INFO: Processed 1 of 7 chunks for job Job(year=2018, tempSpreadsheetId=1Yyf-4CufdK-34zQ3VAhaQLFUHfy0zj98OnHnj7wwYRE, tempSplitFileFolder=tmp_chunks/fct/2018/5578921999335424, nuvolaTempSpreadsheetId=null, nuvolaTemporaryTable=null, canceled=false, disposing=false, processedChunks=1, totalChunks=7, totalChunkSeconds=47)
INFO: T13: Transaction started
INFO: T13: loaded job Job(year=2018, tempSpreadsheetId=1Yyf-4CufdK-34zQ3VAhaQLFUHfy0zj98OnHnj7wwYRE, tempSplitFileFolder=tmp_chunks/fct/2018/5578921999335424, nuvolaTempSpreadsheetId=null, nuvolaTemporaryTable=null, canceled=false, disposing=false, processedChunks=1, totalChunks=7, totalChunkSeconds=47)
INFO: T13: job saved. Job: Job(year=2018, tempSpreadsheetId=1Yyf-4CufdK-34zQ3VAhaQLFUHfy0zj98OnHnj7wwYRE, tempSplitFileFolder=tmp_chunks/fct/2018/5578921999335424, nuvolaTempSpreadsheetId=null, nuvolaTemporaryTable=null, canceled=false, disposing=false, processedChunks=2, totalChunks=7, totalChunkSeconds=98)
INFO: Processed 2 of 7 chunks for job Job(year=2018, tempSpreadsheetId=1Yyf-4CufdK-34zQ3VAhaQLFUHfy0zj98OnHnj7wwYRE, tempSplitFileFolder=tmp_chunks/fct/2018/5578921999335424, nuvolaTempSpreadsheetId=null, nuvolaTemporaryTable=null, canceled=false, disposing=false, processedChunks=2, totalChunks=7, totalChunkSeconds=98)
INFO: T17: Transaction started
INFO: T17: loaded job Job(year=2018, tempSpreadsheetId=1px7iKzxlEwr2hczF6NM9m-1HyRMYJ4GhZ20Ss4zCzqA, tempSplitFileFolder=tmp_chunks/fct/2018/5578921999335424, nuvolaTempSpreadsheetId=null, nuvolaTemporaryTable=null, canceled=false, disposing=false, processedChunks=2, totalChunks=7, totalChunkSeconds=98)
INFO: T17: job saved. Job: Job(year=2018, tempSpreadsheetId=1px7iKzxlEwr2hczF6NM9m-1HyRMYJ4GhZ20Ss4zCzqA, tempSplitFileFolder=tmp_chunks/fct/2018/5578921999335424, nuvolaTempSpreadsheetId=null, nuvolaTemporaryTable=null, canceled=false, disposing=false, processedChunks=3, totalChunks=7, totalChunkSeconds=151)
INFO: Processed 3 of 7 chunks for job Job(year=2018, tempSpreadsheetId=1px7iKzxlEwr2hczF6NM9m-1HyRMYJ4GhZ20Ss4zCzqA, tempSplitFileFolder=tmp_chunks/fct/2018/5578921999335424, nuvolaTempSpreadsheetId=null, nuvolaTemporaryTable=null, canceled=false, disposing=false, processedChunks=3, totalChunks=7, totalChunkSeconds=151)
INFO: T14: Transaction started
INFO: T14: loaded job Job(year=2018, tempSpreadsheetId=1px7iKzxlEwr2hczF6NM9m-1HyRMYJ4GhZ20Ss4zCzqA, tempSplitFileFolder=tmp_chunks/fct/2018/5578921999335424, nuvolaTempSpreadsheetId=null, nuvolaTemporaryTable=null, canceled=false, disposing=false, processedChunks=3, totalChunks=7, totalChunkSeconds=151)
INFO: T14: job saved. Job: Job(year=2018, tempSpreadsheetId=1px7iKzxlEwr2hczF6NM9m-1HyRMYJ4GhZ20Ss4zCzqA, tempSplitFileFolder=tmp_chunks/fct/2018/5578921999335424, nuvolaTempSpreadsheetId=null, nuvolaTemporaryTable=null, canceled=false, disposing=false, processedChunks=4, totalChunks=7, totalChunkSeconds=203)
INFO: Processed 4 of 7 chunks for job Job(year=2018, tempSpreadsheetId=1px7iKzxlEwr2hczF6NM9m-1HyRMYJ4GhZ20Ss4zCzqA, tempSplitFileFolder=tmp_chunks/fct/2018/5578921999335424, nuvolaTempSpreadsheetId=null, nuvolaTemporaryTable=null, canceled=false, disposing=false, processedChunks=4, totalChunks=7, totalChunkSeconds=203)
INFO: T19: Transaction started
INFO: T19: loaded job Job(year=2018, tempSpreadsheetId=1tqa8inL6H94I5Wi9z6UWCoy4C8XDDxltod7afLhCU6o, tempSplitFileFolder=tmp_chunks/fct/2018/5578921999335424, nuvolaTempSpreadsheetId=null, nuvolaTemporaryTable=null, canceled=false, disposing=false, processedChunks=4, totalChunks=7, totalChunkSeconds=203)
INFO: T19: job saved. Job: Job(year=2018, tempSpreadsheetId=1tqa8inL6H94I5Wi9z6UWCoy4C8XDDxltod7afLhCU6o, tempSplitFileFolder=tmp_chunks/fct/2018/5578921999335424, nuvolaTempSpreadsheetId=null, nuvolaTemporaryTable=null, canceled=false, disposing=false, processedChunks=5, totalChunks=7, totalChunkSeconds=250)
INFO: Processed 5 of 7 chunks for job Job(year=2018, tempSpreadsheetId=1tqa8inL6H94I5Wi9z6UWCoy4C8XDDxltod7afLhCU6o, tempSplitFileFolder=tmp_chunks/fct/2018/5578921999335424, nuvolaTempSpreadsheetId=null, nuvolaTemporaryTable=null, canceled=false, disposing=false, processedChunks=5, totalChunks=7, totalChunkSeconds=250)
INFO: T22: Transaction started
INFO: T22: loaded job Job(year=2018, tempSpreadsheetId=1tqa8inL6H94I5Wi9z6UWCoy4C8XDDxltod7afLhCU6o, tempSplitFileFolder=tmp_chunks/fct/2018/5578921999335424, nuvolaTempSpreadsheetId=null, nuvolaTemporaryTable=null, canceled=false, disposing=false, processedChunks=5, totalChunks=7, totalChunkSeconds=250)
INFO: T22: job saved. Job: Job(year=2018, tempSpreadsheetId=1tqa8inL6H94I5Wi9z6UWCoy4C8XDDxltod7afLhCU6o, tempSplitFileFolder=tmp_chunks/fct/2018/5578921999335424, nuvolaTempSpreadsheetId=null, nuvolaTemporaryTable=null, canceled=false, disposing=false, processedChunks=6, totalChunks=7, totalChunkSeconds=319)
INFO: Processed 6 of 7 chunks for job Job(year=2018, tempSpreadsheetId=1tqa8inL6H94I5Wi9z6UWCoy4C8XDDxltod7afLhCU6o, tempSplitFileFolder=tmp_chunks/fct/2018/5578921999335424, nuvolaTempSpreadsheetId=null, nuvolaTemporaryTable=null, canceled=false, disposing=false, processedChunks=6, totalChunks=7, totalChunkSeconds=319)
INFO: T20: Transaction started
INFO: T20: loaded job Job(year=2018, tempSpreadsheetId=1a924P9sXQYXV1Arrv7ak0FLBb2sRcclX6fqNQIFw8m0, tempSplitFileFolder=tmp_chunks/fct/2018/5578921999335424, nuvolaTempSpreadsheetId=null, nuvolaTemporaryTable=null, canceled=false, disposing=false, processedChunks=5, totalChunks=7, totalChunkSeconds=250)
INFO: T20: job saved. Job: Job(year=2018, tempSpreadsheetId=1a924P9sXQYXV1Arrv7ak0FLBb2sRcclX6fqNQIFw8m0, tempSplitFileFolder=tmp_chunks/fct/2018/5578921999335424, nuvolaTempSpreadsheetId=null, nuvolaTemporaryTable=null, canceled=false, disposing=false, processedChunks=6, totalChunks=7, totalChunkSeconds=304)
INFO: Processed 6 of 7 chunks for job Job(year=2018, tempSpreadsheetId=1a924P9sXQYXV1Arrv7ak0FLBb2sRcclX6fqNQIFw8m0, tempSplitFileFolder=tmp_chunks/fct/2018/5578921999335424, nuvolaTempSpreadsheetId=null, nuvolaTemporaryTable=null, canceled=false, disposing=false, processedChunks=6, totalChunks=7, totalChunkSeconds=304)
As you can see, T22 starts with 5 processed chunks, saves it to 6 but then T20 starts again from 5 and saves it to 6. Thus, the condition processedChunks == totalChunks never occurs.
I am using objectify version 5.1.6, on the local AppEngine dev server and in my appengine-web.xml I have enabled the option.
What am I missing here?
java google-app-engine google-cloud-datastore objectify
java google-app-engine google-cloud-datastore objectify
asked Nov 24 '18 at 12:52
Alberto GeniolaAlberto Geniola
104
104
Your transactions are not idempotent (github.com/objectify/objectify/wiki/Transactions#idempotence), just simple incrementing wont work everytime. Do you cache your ImportJob entity? If so, you may want to turn it offofy().cache(false).load()
– Tijkijiki
Dec 28 '18 at 14:23
add a comment |
Your transactions are not idempotent (github.com/objectify/objectify/wiki/Transactions#idempotence), just simple incrementing wont work everytime. Do you cache your ImportJob entity? If so, you may want to turn it offofy().cache(false).load()
– Tijkijiki
Dec 28 '18 at 14:23
Your transactions are not idempotent (github.com/objectify/objectify/wiki/Transactions#idempotence), just simple incrementing wont work everytime. Do you cache your ImportJob entity? If so, you may want to turn it off
ofy().cache(false).load()
– Tijkijiki
Dec 28 '18 at 14:23
Your transactions are not idempotent (github.com/objectify/objectify/wiki/Transactions#idempotence), just simple incrementing wont work everytime. Do you cache your ImportJob entity? If so, you may want to turn it off
ofy().cache(false).load()
– Tijkijiki
Dec 28 '18 at 14:23
add a comment |
0
active
oldest
votes
Your Answer
StackExchange.ifUsing("editor", function () {
StackExchange.using("externalEditor", function () {
StackExchange.using("snippets", function () {
StackExchange.snippets.init();
});
});
}, "code-snippets");
StackExchange.ready(function() {
var channelOptions = {
tags: "".split(" "),
id: "1"
};
initTagRenderer("".split(" "), "".split(" "), channelOptions);
StackExchange.using("externalEditor", function() {
// Have to fire editor after snippets, if snippets enabled
if (StackExchange.settings.snippets.snippetsEnabled) {
StackExchange.using("snippets", function() {
createEditor();
});
}
else {
createEditor();
}
});
function createEditor() {
StackExchange.prepareEditor({
heartbeatType: 'answer',
autoActivateHeartbeat: false,
convertImagesToLinks: true,
noModals: true,
showLowRepImageUploadWarning: true,
reputationToPostImages: 10,
bindNavPrevention: true,
postfix: "",
imageUploader: {
brandingHtml: "Powered by u003ca class="icon-imgur-white" href="https://imgur.com/"u003eu003c/au003e",
contentPolicyHtml: "User contributions licensed under u003ca href="https://creativecommons.org/licenses/by-sa/3.0/"u003ecc by-sa 3.0 with attribution requiredu003c/au003e u003ca href="https://stackoverflow.com/legal/content-policy"u003e(content policy)u003c/au003e",
allowUrls: true
},
onDemand: true,
discardSelector: ".discard-answer"
,immediatelyShowMarkdownHelp:true
});
}
});
Sign up or log in
StackExchange.ready(function () {
StackExchange.helpers.onClickDraftSave('#login-link');
});
Sign up using Google
Sign up using Facebook
Sign up using Email and Password
Post as a guest
Required, but never shown
StackExchange.ready(
function () {
StackExchange.openid.initPostLogin('.new-post-login', 'https%3a%2f%2fstackoverflow.com%2fquestions%2f53458346%2fobjectify-transactions-and-multithreaded-tasks-not-working-as-expected-in-local%23new-answer', 'question_page');
}
);
Post as a guest
Required, but never shown
0
active
oldest
votes
0
active
oldest
votes
active
oldest
votes
active
oldest
votes
Thanks for contributing an answer to Stack Overflow!
- Please be sure to answer the question. Provide details and share your research!
But avoid …
- Asking for help, clarification, or responding to other answers.
- Making statements based on opinion; back them up with references or personal experience.
To learn more, see our tips on writing great answers.
Sign up or log in
StackExchange.ready(function () {
StackExchange.helpers.onClickDraftSave('#login-link');
});
Sign up using Google
Sign up using Facebook
Sign up using Email and Password
Post as a guest
Required, but never shown
StackExchange.ready(
function () {
StackExchange.openid.initPostLogin('.new-post-login', 'https%3a%2f%2fstackoverflow.com%2fquestions%2f53458346%2fobjectify-transactions-and-multithreaded-tasks-not-working-as-expected-in-local%23new-answer', 'question_page');
}
);
Post as a guest
Required, but never shown
Sign up or log in
StackExchange.ready(function () {
StackExchange.helpers.onClickDraftSave('#login-link');
});
Sign up using Google
Sign up using Facebook
Sign up using Email and Password
Post as a guest
Required, but never shown
Sign up or log in
StackExchange.ready(function () {
StackExchange.helpers.onClickDraftSave('#login-link');
});
Sign up using Google
Sign up using Facebook
Sign up using Email and Password
Post as a guest
Required, but never shown
Sign up or log in
StackExchange.ready(function () {
StackExchange.helpers.onClickDraftSave('#login-link');
});
Sign up using Google
Sign up using Facebook
Sign up using Email and Password
Sign up using Google
Sign up using Facebook
Sign up using Email and Password
Post as a guest
Required, but never shown
Required, but never shown
Required, but never shown
Required, but never shown
Required, but never shown
Required, but never shown
Required, but never shown
Required, but never shown
Required, but never shown
Your transactions are not idempotent (github.com/objectify/objectify/wiki/Transactions#idempotence), just simple incrementing wont work everytime. Do you cache your ImportJob entity? If so, you may want to turn it off
ofy().cache(false).load()
– Tijkijiki
Dec 28 '18 at 14:23